AI 长任务最佳实践调研报告

AI 长任务最佳实践调研报告

调研日期: 2026-03-16 核心目标: 构建可靠的长时运行 AI Agent 系统


目录

  1. 核心挑战
  2. Anthropic 官方方案
  3. 状态管理策略
  4. 错误恢复与重试机制
  5. 任务编排模式
  6. 监控与可观测性
  7. 工具与框架选型
  8. 实战操作指南
  9. 最佳实践清单

一、核心挑战

1.1 长任务的困境

┌─────────────────────────────────────────────────────────────────────────┐
│                        长任务核心问题                                     │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│   🧠 Context Window 限制                                                 │
│      └── 即使有 compaction,仍无法在一个窗口内完成复杂任务                  │
│                                                                         │
│   🔌 Session 断连                                                        │
│      └── 每个 Session 开始时无记忆,需要重建上下文                          │
│                                                                         │
│   🎯 进度追踪困难                                                         │
│      └── Agent 不知道已完成什么,剩余什么                                   │
│                                                                         │
│   🐛 中间状态不一致                                                       │
│      └── 任务中断后,代码/数据可能处于半完成状态                             │
│                                                                         │
│   ⏰ 时间跨度大                                                           │
│      └── 任务可能跨越数小时甚至数天                                         │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

1.2 常见失败模式

失败模式 描述 后果
One-Shot 陷阱 试图一次完成所有工作 Context 耗尽,任务半途而废
虚假完成 Agent 声称任务完成,实际未完成 功能缺失,质量不达标
状态遗忘 新 Session 忘记之前的决策和进度 重复工作,决策冲突
测试不足 标记完成但未充分验证 隐藏 Bug,功能不可用
环境混乱 中断后代码/环境处于不可用状态 下次启动需要大量修复工作

二、Anthropic 官方方案

来源: Effective harnesses for long-running agents

2.1 双 Agent 架构

Anthropic 提出的解决方案包含两个专门的 Agent:

┌─────────────────────────────────────────────────────────────────────────┐
│                        双 Agent 架构                                     │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│   🚀 Initializer Agent (初始化 Agent)                                    │
│      ├── 只在第一个 Session 运行                                          │
│      ├── 创建项目基础结构                                                  │
│      ├── 生成 Feature List (功能清单)                                     │
│      ├── 创建 init.sh 启动脚本                                           │
│      ├── 创建 claude-progress.txt 进度文件                               │
│      └── 提交初始 Git Commit                                             │
│                                                                         │
│   🔧 Coding Agent (编码 Agent)                                           │
│      ├── 每个 Session 开始时运行                                          │
│      ├── 读取进度文件和 Git 历史了解状态                                    │
│      ├── 选择一个未完成的功能进行开发                                      │
│      ├── 实现功能并充分测试                                               │
│      ├── 更新进度文件和 Feature List                                      │
│      └── 提交 Git Commit 记录变更                                        │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

2.2 关键组件

Feature List (功能清单)

用 JSON 格式记录所有功能及其状态:

{
  "category": "functional",
  "description": "New chat button creates a fresh conversation",
  "steps": [
    "Navigate to main interface",
    "Click the 'New Chat' button",
    "Verify a new conversation is created",
    "Check that chat area shows welcome state",
    "Verify conversation appears in sidebar"
  ],
  "passes": false
}

为什么用 JSON 而非 Markdown? - 模型更不容易随意修改 JSON 文件 - 结构化数据更易于程序化处理 - 减少误操作删除内容的风险

Progress File (进度文件)

claude-progress.txt 记录每个 Session 的工作摘要:

# Progress Log

## Session 2026-03-16 10:30
- Implemented user authentication
- Added login/logout functionality
- Fixed session management bug
- Status: Auth module complete

## Session 2026-03-16 14:00
- Started work on chat interface
- Created message component
- Pending: WebSocket integration

Init Script (启动脚本)

init.sh 确保环境可重复启动:

#!/bin/bash
# init.sh - 开发环境启动脚本

# 安装依赖
npm install

# 设置环境变量
export NODE_ENV=development
export DATABASE_URL=postgresql://localhost:5432/dev

# 启动开发服务器
npm run dev

# 运行基础测试
npm run test:basic

2.3 Coding Agent Session 启动流程

每个 Session 的标准启动流程:

Step 1: pwd
        └── 确认当前工作目录

Step 2: 读取进度文件
        └── 了解最近完成的工作

Step 3: 读取 Git Log
        └── git log --oneline -20

Step 4: 读取 Feature List
        └── 找出下一个未完成的功能

Step 5: 运行 init.sh
        └── 启动开发服务器

Step 6: 基础功能验证
        └── 使用浏览器自动化测试核心功能

Step 7: 开始开发新功能
        └── 一次只做一个功能

2.4 Agent 行为规范

问题 Initializer Agent 行为 Coding Agent 行为
过早声明完成 创建详细的 Feature List 开始时读取列表,选择一个功能
环境状态混乱 初始化 Git 和进度文件 开始时读取进度和 Git Log,结束时提交
功能标记过早完成 设置 Feature List 必须充分测试后才能标记 passes: true
不知道如何运行 编写 init.sh 开始时读取并执行 init.sh

三、状态管理策略

3.1 状态持久化模式

┌─────────────────────────────────────────────────────────────────────────┐
│                        状态持久化模式                                     │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│   📸 Snapshot-Based (快照模式)                                           │
│      ├── 定期保存完整状态快照                                             │
│      ├── 恢复时从最近的快照开始                                           │
│      └── 适合: 有明确检查点的任务                                         │
│                                                                         │
│   📝 Log-Based (日志模式)                                                 │
│      ├── 记录所有状态变更事件                                             │
│      ├── 通过重放日志重建状态                                             │
│      └── 适合: 需要完整审计追踪的场景                                     │
│                                                                         │
│   🔄 Hybrid (混合模式)                                                    │
│      ├── 定期快照 + 增量日志                                              │
│      ├── 快速恢复 + 完整历史                                              │
│      └── 适合: 大多数生产场景                                             │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

3.2 状态数据结构

# 推荐的状态数据结构

class AgentState:
    """Agent 状态结构"""

    # 1. 任务元信息
    task_id: str              # 任务唯一标识
    task_goal: str            # 任务目标描述
    created_at: datetime      # 创建时间
    updated_at: datetime      # 更新时间

    # 2. 执行状态
    status: str               # pending | running | paused | completed | failed
    current_step: int         # 当前步骤编号
    total_steps: int          # 总步骤数

    # 3. 进度追踪
    completed_steps: list     # 已完成步骤列表
    pending_steps: list       # 待完成步骤列表
    failed_steps: list        # 失败步骤列表

    # 4. 上下文数据
    context: dict             # 任务上下文数据
    decisions: list           # 决策历史
    artifacts: dict           # 产出物引用

    # 5. 错误处理
    errors: list              # 错误历史
    retry_count: int          # 重试次数
    last_checkpoint: datetime # 最后检查点时间

3.3 状态存储方案

存储方案 适用场景 优点 缺点
文件系统 单机开发 简单、无依赖 不支持分布式
SQLite 中等规模 结构化查询 单机限制
PostgreSQL 生产环境 可靠、可扩展 需要运维
Redis 高频更新 快速、支持 TTL 需要持久化配置
云存储 (S3) 大文件/备份 无限容量 延迟较高

3.4 Checkpointing 最佳实践

class CheckpointManager:
    """检查点管理器"""

    def __init__(self, checkpoint_dir: str, interval: int = 300):
        self.checkpoint_dir = checkpoint_dir
        self.interval = interval  # 秒
        self.last_checkpoint = None

    def should_checkpoint(self) -> bool:
        """判断是否需要创建检查点"""
        if self.last_checkpoint is None:
            return True

        elapsed = time.time() - self.last_checkpoint
        return elapsed >= self.interval

    def save_checkpoint(self, state: AgentState) -> str:
        """保存检查点"""
        checkpoint_id = f"ckpt_{int(time.time())}"
        checkpoint_path = f"{self.checkpoint_dir}/{checkpoint_id}.json"

        with open(checkpoint_path, 'w') as f:
            json.dump(state.to_dict(), f)

        self.last_checkpoint = time.time()
        return checkpoint_id

    def load_latest_checkpoint(self) -> AgentState:
        """加载最新检查点"""
        checkpoints = sorted(Path(self.checkpoint_dir).glob("ckpt_*.json"))
        if not checkpoints:
            return None

        with open(checkpoints[-1], 'r') as f:
            return AgentState.from_dict(json.load(f))

    def cleanup_old_checkpoints(self, keep: int = 5):
        """清理旧检查点"""
        checkpoints = sorted(Path(self.checkpoint_dir).glob("ckpt_*.json"))
        for ckpt in checkpoints[:-keep]:
            ckpt.unlink()

四、错误恢复与重试机制

4.1 重试策略

指数退避 (Exponential Backoff)

import random
import time

def exponential_backoff(
    attempt: int,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    jitter: bool = True
) -> float:
    """
    指数退避算法

    Args:
        attempt: 当前尝试次数 (从 0 开始)
        base_delay: 基础延迟 (秒)
        max_delay: 最大延迟 (秒)
        jitter: 是否添加随机抖动

    Returns:
        等待时间 (秒)
    """
    delay = min(base_delay * (2 ** attempt), max_delay)

    if jitter:
        # 添加 0.5x - 1.5x 的随机抖动
        delay = delay * (0.5 + random.random())

    return delay

# 使用示例
for attempt in range(5):
    try:
        result = api_call()
        break
    except Exception as e:
        if attempt == 4:
            raise
        wait_time = exponential_backoff(attempt)
        time.sleep(wait_time)

重试装饰器

from functools import wraps
import logging

logger = logging.getLogger(__name__)

def retry(
    max_attempts: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    exceptions: tuple = (Exception,)
):
    """
    重试装饰器

    Args:
        max_attempts: 最大尝试次数
        base_delay: 基础延迟
        max_delay: 最大延迟
        exceptions: 需要重试的异常类型
    """
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            last_exception = None

            for attempt in range(max_attempts):
                try:
                    return func(*args, **kwargs)
                except exceptions as e:
                    last_exception = e

                    if attempt == max_attempts - 1:
                        logger.error(f"所有重试失败: {func.__name__}")
                        raise

                    wait_time = exponential_backoff(attempt, base_delay, max_delay)
                    logger.warning(
                        f"{func.__name__} 失败 (尝试 {attempt + 1}/{max_attempts}), "
                        f"{wait_time:.1f}秒后重试: {e}"
                    )
                    time.sleep(wait_time)

            raise last_exception

        return wrapper
    return decorator

# 使用示例
@retry(max_attempts=5, exceptions=(ConnectionError, TimeoutError))
def call_llm_api(prompt: str) -> str:
    return api_client.generate(prompt)

4.2 故障分类与处理

from enum import Enum
from dataclasses import dataclass

class ErrorType(Enum):
    """错误类型分类"""
    TRANSIENT = "transient"      # 暂时性错误,可重试
    PERMANENT = "permanent"      # 永久性错误,需要人工干预
    RESOURCE = "resource"        # 资源限制,需要等待
    LOGIC = "logic"              # 逻辑错误,需要修改代码

@dataclass
class ErrorContext:
    """错误上下文"""
    error_type: ErrorType
    message: str
    recoverable: bool
    suggested_action: str
    retry_after: float | None = None

def classify_error(error: Exception) -> ErrorContext:
    """错误分类器"""
    error_str = str(error).lower()

    # Rate Limit
    if "rate limit" in error_str or "429" in error_str:
        return ErrorContext(
            error_type=ErrorType.RESOURCE,
            message=str(error),
            recoverable=True,
            suggested_action="等待后重试",
            retry_after=60.0
        )

    # Timeout
    if "timeout" in error_str or "timed out" in error_str:
        return ErrorContext(
            error_type=ErrorType.TRANSIENT,
            message=str(error),
            recoverable=True,
            suggested_action="重试请求"
        )

    # Auth Error
    if "unauthorized" in error_str or "401" in error_str or "403" in error_str:
        return ErrorContext(
            error_type=ErrorType.PERMANENT,
            message=str(error),
            recoverable=False,
            suggested_action="检查 API Key 或权限"
        )

    # Default
    return ErrorContext(
        error_type=ErrorType.TRANSIENT,
        message=str(error),
        recoverable=True,
        suggested_action="重试"
    )

4.3 Circuit Breaker (断路器)

from enum import Enum
import threading
import time

class CircuitState(Enum):
    CLOSED = "closed"      # 正常状态
    OPEN = "open"          # 断开状态 (拒绝请求)
    HALF_OPEN = "half_open"  # 半开状态 (试探性恢复)

class CircuitBreaker:
    """
    断路器模式

    防止级联故障,当错误率达到阈值时自动断开
    """

    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: float = 30.0,
        success_threshold: int = 3
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.success_threshold = success_threshold

        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.success_count = 0
        self.last_failure_time = None
        self.lock = threading.Lock()

    def can_execute(self) -> bool:
        """检查是否可以执行"""
        with self.lock:
            if self.state == CircuitState.CLOSED:
                return True

            if self.state == CircuitState.OPEN:
                # 检查是否可以进入半开状态
                if time.time() - self.last_failure_time >= self.recovery_timeout:
                    self.state = CircuitState.HALF_OPEN
                    self.success_count = 0
                    return True
                return False

            # HALF_OPEN
            return True

    def record_success(self):
        """记录成功"""
        with self.lock:
            if self.state == CircuitState.HALF_OPEN:
                self.success_count += 1
                if self.success_count >= self.success_threshold:
                    self.state = CircuitState.CLOSED
                    self.failure_count = 0

    def record_failure(self):
        """记录失败"""
        with self.lock:
            self.failure_count += 1
            self.last_failure_time = time.time()

            if self.failure_count >= self.failure_threshold:
                self.state = CircuitState.OPEN

# 使用示例
circuit_breaker = CircuitBreaker()

def call_with_circuit_breaker(func, *args, **kwargs):
    if not circuit_breaker.can_execute():
        raise Exception("Circuit breaker is open")

    try:
        result = func(*args, **kwargs)
        circuit_breaker.record_success()
        return result
    except Exception as e:
        circuit_breaker.record_failure()
        raise

五、任务编排模式

5.1 工作流模式分类

参考: AI SDK Workflow Patterns

┌─────────────────────────────────────────────────────────────────────────┐
│                        工作流编排模式                                     │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│   📦 Sequential (顺序处理)                                               │
│      Step A → Step B → Step C → Done                                    │
│      适合: 有明确依赖关系的任务                                           │
│                                                                         │
│   🔀 Routing (路由分发)                                                  │
│      Input → [Classifier] → Agent A / Agent B / Agent C                 │
│      适合: 需要根据输入选择不同处理逻辑                                    │
│                                                                         │
│   ⚡ Parallel (并行处理)                                                 │
│      Input → [Agent A, Agent B, Agent C] → [Aggregator] → Output        │
│      适合: 可独立执行的子任务                                             │
│                                                                         │
│   🎭 Orchestrator-Worker (编排者-工作者)                                 │
│      Orchestrator 分解任务 → Workers 执行 → Orchestrator 汇总            │
│      适合: 大型复杂任务                                                   │
│                                                                         │
│   🔄 Evaluator-Optimizer (评估-优化)                                     │
│      Worker 产出 → Evaluator 评估 → 反馈 → Worker 改进                   │
│      适合: 需要迭代改进的任务                                             │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

5.2 状态机模式

from enum import Enum
from typing import Callable, Dict, Optional
import logging

logger = logging.getLogger(__name__)

class TaskState(Enum):
    """任务状态"""
    PENDING = "pending"
    INITIALIZING = "initializing"
    RUNNING = "running"
    PAUSED = "paused"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELLED = "cancelled"

class StateMachine:
    """
    状态机任务编排器

    定义清晰的状态转换规则,确保任务按预期流程执行
    """

    # 允许的状态转换
    TRANSITIONS: Dict[TaskState, set] = {
        TaskState.PENDING: {TaskState.INITIALIZING, TaskState.CANCELLED},
        TaskState.INITIALIZING: {TaskState.RUNNING, TaskState.FAILED, TaskState.CANCELLED},
        TaskState.RUNNING: {TaskState.PAUSED, TaskState.COMPLETED, TaskState.FAILED, TaskState.CANCELLED},
        TaskState.PAUSED: {TaskState.RUNNING, TaskState.CANCELLED},
        TaskState.COMPLETED: set(),
        TaskState.FAILED: {TaskState.PENDING},  # 允许重试
        TaskState.CANCELLED: set(),
    }

    def __init__(self):
        self.state = TaskState.PENDING
        self.handlers: Dict[TaskState, Callable] = {}
        self.history: list = []

    def register_handler(self, state: TaskState, handler: Callable):
        """注册状态处理器"""
        self.handlers[state] = handler

    def transition(self, new_state: TaskState) -> bool:
        """状态转换"""
        if new_state not in self.TRANSITIONS[self.state]:
            logger.error(f"无效的状态转换: {self.state} → {new_state}")
            return False

        old_state = self.state
        self.state = new_state
        self.history.append({
            "from": old_state,
            "to": new_state,
            "timestamp": time.time()
        })

        logger.info(f"状态转换: {old_state} → {new_state}")
        return True

    def execute(self, context: dict) -> dict:
        """执行当前状态的处理逻辑"""
        if self.state not in self.handlers:
            raise ValueError(f"未注册处理器: {self.state}")

        handler = self.handlers[self.state]
        return handler(context)

    def can_resume(self) -> bool:
        """是否可以恢复执行"""
        return self.state in {TaskState.PAUSED, TaskState.FAILED}

    def get_history(self) -> list:
        """获取状态历史"""
        return self.history.copy()

5.3 步骤编排器

from dataclasses import dataclass, field
from typing import Any, Callable, List, Optional
import json
from pathlib import Path

@dataclass
class Step:
    """任务步骤"""
    id: str
    name: str
    handler: Callable
    dependencies: List[str] = field(default_factory=list)
    retry_count: int = 0
    max_retries: int = 3
    timeout: float = 300.0
    status: str = "pending"  # pending | running | completed | failed | skipped

@dataclass
class StepResult:
    """步骤执行结果"""
    step_id: str
    success: bool
    output: Any = None
    error: Optional[str] = None
    duration: float = 0.0

class StepOrchestrator:
    """
    步骤编排器

    管理多步骤任务的执行,支持依赖关系、重试和状态持久化
    """

    def __init__(self, state_file: str = "task_state.json"):
        self.steps: Dict[str, Step] = {}
        self.results: Dict[str, StepResult] = {}
        self.state_file = Path(state_file)
        self.context: dict = {}

    def add_step(self, step: Step):
        """添加步骤"""
        self.steps[step.id] = step

    def load_state(self):
        """加载状态"""
        if self.state_file.exists():
            with open(self.state_file, 'r') as f:
                data = json.load(f)

            # 恢复步骤状态
            for step_id, status in data.get("step_status", {}).items():
                if step_id in self.steps:
                    self.steps[step_id].status = status

            # 恢复上下文
            self.context = data.get("context", {})
            logger.info(f"已加载状态: {len(self.steps)} 个步骤")

    def save_state(self):
        """保存状态"""
        data = {
            "step_status": {
                step_id: step.status
                for step_id, step in self.steps.items()
            },
            "context": self.context,
            "updated_at": time.time()
        }

        with open(self.state_file, 'w') as f:
            json.dump(data, f, indent=2)

    def get_ready_steps(self) -> List[Step]:
        """获取可执行的步骤"""
        ready = []
        for step in self.steps.values():
            if step.status != "pending":
                continue

            # 检查依赖是否完成
            deps_completed = all(
                self.steps[dep_id].status == "completed"
                for dep_id in step.dependencies
                if dep_id in self.steps
            )

            if deps_completed:
                ready.append(step)

        return ready

    def execute_step(self, step: Step) -> StepResult:
        """执行单个步骤"""
        import time
        start_time = time.time()

        try:
            step.status = "running"
            self.save_state()

            output = step.handler(self.context)

            step.status = "completed"
            result = StepResult(
                step_id=step.id,
                success=True,
                output=output,
                duration=time.time() - start_time
            )

        except Exception as e:
            step.retry_count += 1

            if step.retry_count >= step.max_retries:
                step.status = "failed"
            else:
                step.status = "pending"  # 允许重试

            result = StepResult(
                step_id=step.id,
                success=False,
                error=str(e),
                duration=time.time() - start_time
            )

        self.results[step.id] = result
        self.save_state()
        return result

    def run(self) -> bool:
        """运行整个工作流"""
        self.load_state()

        while True:
            ready_steps = self.get_ready_steps()

            if not ready_steps:
                # 检查是否全部完成
                all_done = all(
                    s.status in ("completed", "skipped")
                    for s in self.steps.values()
                )

                if all_done:
                    logger.info("所有步骤完成")
                    return True

                # 检查是否有失败的步骤
                has_failed = any(
                    s.status == "failed"
                    for s in self.steps.values()
                )

                if has_failed:
                    logger.error("存在失败的步骤")
                    return False

                # 等待中的步骤
                logger.info("等待步骤完成...")
                time.sleep(1)
                continue

            # 执行就绪的步骤
            for step in ready_steps:
                logger.info(f"执行步骤: {step.name}")
                result = self.execute_step(step)

                if result.success:
                    logger.info(f"步骤完成: {step.name}")
                else:
                    logger.error(f"步骤失败: {step.name} - {result.error}")

        return False

六、监控与可观测性

6.1 可观测性三支柱

┌─────────────────────────────────────────────────────────────────────────┐
│                        可观测性三支柱                                     │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│   📊 Metrics (指标)                                                      │
│      ├── 任务完成率                                                      │
│      ├── 平均执行时间                                                    │
│      ├── 错误率                                                          │
│      ├── Token 消耗                                                      │
│      └── 资源使用率                                                      │
│                                                                         │
│   📝 Logs (日志)                                                         │
│      ├── 结构化日志                                                      │
│      ├── 任务执行轨迹                                                    │
│      ├── 错误堆栈                                                        │
│      └── 决策记录                                                        │
│                                                                         │
│   🔍 Traces (追踪)                                                       │
│      ├── 跨步骤追踪                                                      │
│      ├── API 调用链                                                      │
│      ├── 依赖关系图                                                      │
│      └── 性能瓶颈定位                                                    │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

6.2 结构化日志

import json
import logging
from datetime import datetime
from typing import Any

class StructuredLogger:
    """结构化日志记录器"""

    def __init__(self, task_id: str, log_file: str = "task.log"):
        self.task_id = task_id
        self.log_file = log_file
        self.handlers = []

    def log(
        self,
        level: str,
        message: str,
        **kwargs: Any
    ):
        """记录结构化日志"""
        entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "task_id": self.task_id,
            "level": level,
            "message": message,
            **kwargs
        }

        # 写入文件
        with open(self.log_file, 'a') as f:
            f.write(json.dumps(entry) + '\n')

        # 控制台输出
        log_method = getattr(logging, level.lower(), logging.info)
        log_method(f"[{self.task_id}] {message}")

    def info(self, message: str, **kwargs):
        self.log("INFO", message, **kwargs)

    def error(self, message: str, **kwargs):
        self.log("ERROR", message, **kwargs)

    def warning(self, message: str, **kwargs):
        self.log("WARNING", message, **kwargs)

    def step_start(self, step_id: str, step_name: str):
        """记录步骤开始"""
        self.log("INFO", f"步骤开始: {step_name}", step_id=step_id, event="step_start")

    def step_end(self, step_id: str, step_name: str, success: bool, duration: float):
        """记录步骤结束"""
        self.log(
            "INFO" if success else "ERROR",
            f"步骤{'完成' if success else '失败'}: {step_name}",
            step_id=step_id,
            event="step_end",
            success=success,
            duration_seconds=duration
        )

    def decision(self, decision: str, reason: str):
        """记录决策"""
        self.log("INFO", f"决策: {decision}", event="decision", reason=reason)

    def checkpoint(self, checkpoint_id: str, state: dict):
        """记录检查点"""
        self.log("INFO", f"创建检查点", event="checkpoint", checkpoint_id=checkpoint_id)

6.3 进度仪表盘

from dataclasses import dataclass
from typing import Dict, List
import json

@dataclass
class TaskProgress:
    """任务进度"""
    task_id: str
    status: str
    total_steps: int
    completed_steps: int
    failed_steps: int
    current_step: str
    started_at: str
    estimated_completion: str | None
    progress_percent: float

class ProgressDashboard:
    """
    进度仪表盘

    生成任务进度的可视化报告
    """

    def __init__(self, state_file: str):
        self.state_file = state_file

    def get_progress(self) -> TaskProgress:
        """获取进度信息"""
        with open(self.state_file, 'r') as f:
            state = json.load(f)

        step_status = state.get("step_status", {})
        total = len(step_status)
        completed = sum(1 for s in step_status.values() if s == "completed")
        failed = sum(1 for s in step_status.values() if s == "failed")

        # 找到当前步骤
        current = next(
            (k for k, v in step_status.items() if v == "running"),
            "N/A"
        )

        progress = (completed / total * 100) if total > 0 else 0

        return TaskProgress(
            task_id=state.get("task_id", "unknown"),
            status=state.get("status", "unknown"),
            total_steps=total,
            completed_steps=completed,
            failed_steps=failed,
            current_step=current,
            started_at=state.get("started_at", "unknown"),
            estimated_completion=None,  # TODO: 实现预估
            progress_percent=round(progress, 1)
        )

    def render(self) -> str:
        """渲染进度报告"""
        progress = self.get_progress()

        # 进度条
        bar_length = 40
        filled = int(bar_length * progress.progress_percent / 100)
        bar = "█" * filled + "░" * (bar_length - filled)

        return f"""
┌─────────────────────────────────────────────────────────────┐
│                     任务进度报告                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  Task ID: {progress.task_id:<42} │
│  Status: {progress.status:<43} │
│                                                             │
│  [{bar}] {progress.progress_percent:.1f}%   │
│                                                             │
│  Steps: {progress.completed_steps}/{progress.total_steps} completed, {progress.failed_steps} failed        │
│  Current: {progress.current_step:<42} │
│                                                             │
│  Started: {progress.started_at:<41} │
│                                                             │
└─────────────────────────────────────────────────────────────┘
"""

七、工具与框架选型

7.1 长任务框架对比

框架 类型 特点 适用场景
Temporal 工作流引擎 持久化执行、自动重试、长运行支持 企业级生产环境
LangGraph Agent 框架 状态图、检查点、PostgreSQL 持久化 LLM 工作流
Inngest Serverless 事件驱动、自动重试、Durable Functions Serverless 架构
Restate Durable Execution 轻量级、支持 LLM、故障恢复 现代 AI 应用
Claude Agent SDK Agent SDK 内置 Todo、Compaction、Hooks Claude 生态
自建方案 自定义 完全控制、轻量 学习/简单场景

7.2 Temporal 示例

# Temporal 工作流示例
from datetime import timedelta
from temporalio import workflow, activity

@workflow.defn
class LongRunningTaskWorkflow:
    """长任务工作流"""

    @workflow.run
    async def run(self, task_config: dict) -> dict:
        # Step 1: 初始化
        init_result = await workflow.execute_activity(
            "initialize_task",
            task_config,
            start_to_close_timeout=timedelta(minutes=5)
        )

        # Step 2: 执行子任务 (可并行)
        sub_results = await workflow.execute_activity(
            "execute_subtasks",
            init_result,
            start_to_close_timeout=timedelta(hours=2)
        )

        # Step 3: 汇总结果
        final_result = await workflow.execute_activity(
            "aggregate_results",
            sub_results,
            start_to_close_timeout=timedelta(minutes=10)
        )

        return final_result

@activity.defn
async def initialize_task(config: dict) -> dict:
    """初始化活动"""
    # 自动重试、超时处理
    return {"status": "initialized", "config": config}

7.3 LangGraph 示例

from langgraph.graph import StateGraph, END
from langgraph.checkpoint.postgres import PostgresSaver
from typing import TypedDict

class AgentState(TypedDict):
    messages: list
    current_step: str
    results: dict

# 定义节点
def research_node(state: AgentState) -> AgentState:
    # 研究逻辑
    return {**state, "current_step": "research_done"}

def analyze_node(state: AgentState) -> AgentState:
    # 分析逻辑
    return {**state, "current_step": "analysis_done"}

def write_node(state: AgentState) -> AgentState:
    # 写作逻辑
    return {**state, "current_step": "writing_done"}

# 构建图
workflow = StateGraph(AgentState)
workflow.add_node("research", research_node)
workflow.add_node("analyze", analyze_node)
workflow.add_node("write", write_node)

workflow.add_edge("research", "analyze")
workflow.add_edge("analyze", "write")
workflow.add_edge("write", END)

workflow.set_entry_point("research")

# 添加检查点 (PostgreSQL 持久化)
checkpointer = PostgresSaver(connection_string)
app = workflow.compile(checkpointer=checkpointer)

# 运行 (支持恢复)
result = app.invoke(
    {"messages": [], "current_step": "", "results": {}},
    config={"configurable": {"thread_id": "task-123"}}
)

八、实战操作指南

8.1 长任务启动清单

在启动一个长任务前,确保以下准备工作已完成:

┌─────────────────────────────────────────────────────────────────────────┐
│                        长任务启动清单                                     │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│   📋 任务设计                                                            │
│   □ 明确定义任务目标和成功标准                                            │
│   □ 将任务分解为独立、可验证的步骤                                        │
│   □ 识别步骤间的依赖关系                                                  │
│   □ 为每个步骤设定超时和重试策略                                          │
│                                                                         │
│   💾 状态管理                                                            │
│   □ 选择状态存储方案 (文件/数据库)                                        │
│   □ 设计状态数据结构                                                      │
│   □ 实现检查点保存逻辑                                                    │
│   □ 实现状态恢复逻辑                                                      │
│                                                                         │
│   🛡️ 错误处理                                                            │
│   □ 分类可能出现的错误                                                    │
│   □ 实现重试机制 (指数退避)                                               │
│   □ 实现断路器 (可选)                                                     │
│   □ 定义失败恢复策略                                                      │
│                                                                         │
│   📊 可观测性                                                            │
│   □ 配置结构化日志                                                        │
│   □ 实现进度追踪                                                          │
│   □ 设置告警规则 (可选)                                                   │
│   □ 准备进度查看方式                                                      │
│                                                                         │
│   🧪 测试                                                                │
│   □ 测试正常执行流程                                                      │
│   □ 测试中断恢复                                                          │
│   □ 测试错误处理                                                          │
│   □ 测试并发场景 (如适用)                                                 │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

8.2 推荐的项目结构

long-running-task/
├── task_config.json          # 任务配置
├── init.sh                   # 初始化脚本
├── state/
│   ├── task_state.json       # 任务状态
│   ├── checkpoints/          # 检查点目录
│   │   ├── ckpt_1710560000.json
│   │   └── ckpt_1710560300.json
│   └── progress.json         # 进度摘要
├── logs/
│   ├── task.log              # 结构化日志
│   └── decisions.log         # 决策记录
├── artifacts/                # 产出物
│   ├── report.md
│   └── data.json
├── src/
│   ├── orchestrator.py       # 任务编排器
│   ├── steps/                # 步骤实现
│   │   ├── step_1_research.py
│   │   ├── step_2_analyze.py
│   │   └── step_3_report.py
│   ├── state_manager.py      # 状态管理
│   └── logger.py             # 日志工具
├── tests/
│   ├── test_orchestrator.py
│   └── test_steps.py
└── requirements.txt

8.3 完整启动脚本

#!/usr/bin/env python3
"""
长任务启动脚本

使用方式:
    python run_task.py --config task_config.json
    python run_task.py --config task_config.json --resume
"""

import argparse
import json
import sys
from pathlib import Path
from datetime import datetime

from orchestrator import StepOrchestrator, Step
from state_manager import CheckpointManager
from logger import StructuredLogger
from dashboard import ProgressDashboard

def load_config(config_path: str) -> dict:
    """加载配置"""
    with open(config_path, 'r') as f:
        return json.load(f)

def setup_directories(task_id: str) -> dict:
    """设置目录结构"""
    base = Path("tasks") / task_id
    dirs = {
        "base": base,
        "state": base / "state",
        "checkpoints": base / "state" / "checkpoints",
        "logs": base / "logs",
        "artifacts": base / "artifacts"
    }

    for d in dirs.values():
        d.mkdir(parents=True, exist_ok=True)

    return dirs

def create_steps(config: dict) -> list[Step]:
    """创建步骤"""
    from steps import (
        step_1_research,
        step_2_analyze,
        step_3_report
    )

    steps = [
        Step(
            id="research",
            name="信息收集",
            handler=step_1_research.execute,
            max_retries=3,
            timeout=600.0
        ),
        Step(
            id="analyze",
            name="数据分析",
            handler=step_2_analyze.execute,
            dependencies=["research"],
            max_retries=2,
            timeout=1200.0
        ),
        Step(
            id="report",
            name="生成报告",
            handler=step_3_report.execute,
            dependencies=["analyze"],
            max_retries=2,
            timeout=300.0
        )
    ]

    return steps

def main():
    parser = argparse.ArgumentParser(description="长任务执行器")
    parser.add_argument("--config", required=True, help="配置文件路径")
    parser.add_argument("--resume", action="store_true", help="恢复中断的任务")
    args = parser.parse_args()

    # 加载配置
    config = load_config(args.config)
    task_id = config.get("task_id", f"task_{datetime.now().strftime('%Y%m%d_%H%M%S')}")

    # 设置目录
    dirs = setup_directories(task_id)

    # 初始化组件
    logger = StructuredLogger(
        task_id,
        log_file=str(dirs["logs"] / "task.log")
    )
    checkpoint_mgr = CheckpointManager(
        checkpoint_dir=str(dirs["checkpoints"])
    )
    orchestrator = StepOrchestrator(
        state_file=str(dirs["state"] / "task_state.json")
    )

    # 注册步骤
    for step in create_steps(config):
        orchestrator.add_step(step)

    logger.info("任务启动", config=config, resume=args.resume)

    try:
        # 执行任务
        success = orchestrator.run()

        if success:
            logger.info("任务完成")
            print("\n✅ 任务完成!")

            # 显示进度
            dashboard = ProgressDashboard(orchestrator.state_file)
            print(dashboard.render())
        else:
            logger.error("任务失败")
            print("\n❌ 任务失败,请检查日志")

            sys.exit(1)

    except KeyboardInterrupt:
        logger.warning("用户中断任务")
        orchestrator.save_state()
        print("\n⏸️ 任务已暂停,可使用 --resume 恢复")
        sys.exit(0)

    except Exception as e:
        logger.error(f"任务异常: {e}")
        orchestrator.save_state()
        print(f"\n💥 任务异常: {e}")
        sys.exit(1)

if __name__ == "__main__":
    main()

8.4 监控脚本

#!/usr/bin/env python3
"""
任务监控脚本

使用方式:
    python monitor.py --task task_20260316_143000
"""

import argparse
import time
from pathlib import Path
from dashboard import ProgressDashboard
import json

def monitor_task(task_dir: str, interval: float = 5.0):
    """监控任务进度"""
    state_file = Path(task_dir) / "state" / "task_state.json"
    log_file = Path(task_dir) / "logs" / "task.log"

    if not state_file.exists():
        print(f"❌ 任务状态文件不存在: {state_file}")
        return

    dashboard = ProgressDashboard(str(state_file))

    print("🔍 开始监控任务...")
    print("按 Ctrl+C 停止\n")

    last_lines = 0

    try:
        while True:
            # 显示进度
            print("\033[2J\033[H")  # 清屏
            print(dashboard.render())

            # 显示最近日志
            if log_file.exists():
                with open(log_file, 'r') as f:
                    lines = f.readlines()

                if len(lines) > last_lines:
                    print("\n📜 最近日志:")
                    for line in lines[-5:]:
                        try:
                            entry = json.loads(line)
                            print(f"  [{entry['level']}] {entry['message']}")
                        except:
                            pass
                    last_lines = len(lines)

            time.sleep(interval)

    except KeyboardInterrupt:
        print("\n👋 监控已停止")

def main():
    parser = argparse.ArgumentParser(description="任务监控器")
    parser.add_argument("--task", required=True, help="任务目录")
    parser.add_argument("--interval", type=float, default=5.0, help="刷新间隔(秒)")
    args = parser.parse_args()

    monitor_task(args.task, args.interval)

if __name__ == "__main__":
    main()

九、最佳实践清单

9.1 设计原则

┌─────────────────────────────────────────────────────────────────────────┐
│                        长任务设计原则                                     │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│   ✅ DO (推荐)                                                          │
│                                                                         │
│   1. 小步迭代                                                            │
│      └── 每个步骤应该是原子性的,可独立验证和恢复                           │
│                                                                         │
│   2. 频繁检查点                                                          │
│      └── 在关键步骤后保存状态,减少恢复时的重复工作                         │
│                                                                         │
│   3. 幂等性设计                                                          │
│      └── 同一步骤多次执行应该产生相同结果                                   │
│                                                                         │
│   4. 明确的成功标准                                                      │
│      └── 每个步骤都有可验证的完成条件                                      │
│                                                                         │
│   5. 优雅降级                                                            │
│      └── 部分失败不影响整体,允许跳过非关键步骤                              │
│                                                                         │
│   ❌ DON'T (避免)                                                        │
│                                                                         │
│   1. 一步到位                                                            │
│      └── 避免试图在一个大步骤中完成所有工作                                 │
│                                                                         │
│   2. 状态依赖内存                                                        │
│      └── 所有状态必须持久化,不能依赖进程内存                               │
│                                                                         │
│   3. 无限重试                                                            │
│      └── 设置最大重试次数,避免死循环                                      │
│                                                                         │
│   4. 忽略错误                                                            │
│      └── 记录所有错误,即使选择继续执行                                    │
│                                                                         │
│   5. 假设网络稳定                                                        │
│      └── 所有外部调用都可能失败,需要重试机制                               │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

9.2 检查点策略

场景 检查点频率 保留策略
快速任务 (< 10 分钟) 每步完成后 保留最近 3 个
中等任务 (10-60 分钟) 每步 + 每 5 分钟 保留最近 10 个
长时任务 (> 1 小时) 每步 + 每 10 分钟 保留最近 20 个
关键任务 每步 + 每 2 分钟 保留所有 + 定期归档

9.3 错误处理策略

错误类型 处理方式 重试次数 退避策略
网络超时 自动重试 5 指数退避 (1s → 60s)
Rate Limit 等待后重试 3 固定等待 (60s)
认证失败 立即失败 0 N/A
资源不足 等待后重试 3 线性退避 (30s)
逻辑错误 记录并跳过 0 N/A

9.4 Agent 提示词模板

# 长任务 Agent 提示词模板

## 角色定义
你是一个长任务执行 Agent,负责完成复杂的、跨多个 Session 的任务。

## 核心原则

1. **增量执行**: 每次只做一个步骤,不要试图一次完成所有工作
2. **状态持久化**: 每完成一个步骤,必须更新状态文件
3. **充分测试**: 标记完成前必须验证功能正常
4. **清晰记录**: 在进度文件中记录所有决策和变更

## Session 启动流程

每个 Session 开始时,按以下步骤操作:

1. 运行 `pwd` 确认工作目录
2. 读取进度文件 (`progress.md`)
3. 读取 Git 日志 (`git log --oneline -20`)
4. 读取功能清单 (`feature_list.json`)
5. 运行 `init.sh` 启动环境
6. 执行基础验证测试
7. 选择下一个未完成的功能

## Session 结束流程

每个 Session 结束前,必须:

1. 确保代码处于可运行状态
2. 提交 Git Commit (描述性消息)
3. 更新进度文件
4. 更新功能清单状态

## 错误处理

遇到错误时:
1. 记录错误详情到日志
2. 尝试恢复到上一个稳定状态
3. 如果无法恢复,保存当前进度并报告
4. 不要跳过错误,不要标记未完成的功能为完成

## 输出格式

所有输出使用 Markdown 格式,包含:
- 当前步骤说明
- 执行的操作
- 遇到的问题
- 下一步计划

参考资源

官方文档

资源 链接
Anthropic 长任务 Agent Effective harnesses for long-running agents
Claude Agent SDK platform.claude.com
AI SDK Workflows ai-sdk.dev/docs/agents/workflows

框架与工具

工具 链接
Temporal temporal.io
LangGraph langchain-ai.github.io/langgraph
Inngest inngest.com
Restate restate.dev

社区资源

资源 链接
Agent Patterns thegroundtruth.substack.com
State Management nookplot.com
Durable Execution restate.dev/blog

报告生成时间: 2026-03-16 基于 Anthropic 官方文档及社区最佳实践整理