Learn Claude Code · Session 11

ERROR RECOVERY

三条恢复路径 + 指数退避 — 相比 s10 的新增内容解析
§0 整体功能 · Overview

解决的问题

s10 的 LLM 调用没有任何容错:一旦 API 返回 429(限速)、529(过载)、max_tokens(截断)或 prompt_too_long(上下文超限),程序直接抛异常崩溃。

s11 为每类错误设计了独立的恢复策略,使 agent 在各类瞬态故障下能自动恢复继续工作。

三条恢复路径

  1. Path 1 — max_tokens:输出被截断 → 先升级到 64K tokens 重试;仍截断则追加续写提示(最多 3 次)
  2. Path 2 — prompt_too_long:上下文超限 → reactive compact 压缩消息列表 → 重试一次
  3. Path 3 — 429/529:速率限制/过载 → 指数退避 + 随机抖动;连续 529 达阈值时切换备用模型

关键新增 vs s10

  • 6 个新常量(ESCALATED_MAX_TOKENS 等)
  • RecoveryState 数据类 — 跨循环追踪恢复状态
  • retry_delay(attempt, retry_after) — 指数退避计算
  • with_retry(fn, state) — 429/529 重试包装
  • is_prompt_too_long_error(e) — 错误分类
  • reactive_compact(messages) — 紧急压缩
  • agent_loop 内部重构:try/except 包裹 LLM 调用

错误处理决策树(简化)

ASCII Flow LLM 调用 ├─ 成功 → stop_reason? │ ├─ "max_tokens" → has_escalated? │ │ ├─ No → 升级到 64K,continue │ │ └─ Yes → recovery_count < 3? │ │ ├─ Yes → 追加 CONTINUATION_PROMPT,continue │ │ └─ No → 放弃,return │ ├─ "tool_use" → 执行工具,继续循环 │ └─ 其他 → 正常返回 └─ 异常 → with_retry 已处理 429/529 → 外层 except: ├─ prompt_too_long? → reactive_compact (一次) → continue └─ 其他 → 记录错误,追加错误消息,return

§1 新增常量
代码行语法注解
PRIMARY_MODEL = os.environ["MODEL_ID"] s10 用 MODEL,s11 改名为 PRIMARY_MODEL 以区分主/备用模型
FALLBACK_MODEL = os.getenv("FALLBACK_MODEL_ID") os.getenv()(非 os.environ[]):键不存在时返回 None 而非抛异常;备用模型可选配置
ESCALATED_MAX_TOKENS = 64000 Path 1 第一步:将 max_tokens 从 8K 升级到 64K 重试
DEFAULT_MAX_TOKENS = 8000 默认值;拆出为常量便于统一修改
MAX_RECOVERY_RETRIES = 3 Path 1 续写上限:最多 3 次 continuation prompt
MAX_RETRIES = 10 Path 3 退避最大次数:超过后抛 RuntimeError
BASE_DELAY_MS = 500 退避基础延迟(毫秒);第 n 次等待 = min(500 * 2^n, 32000) / 1000 秒
MAX_CONSECUTIVE_529 = 3 连续 529 超过 3 次时触发模型切换(若配置了备用模型)
CONTINUATION_PROMPT = ( 多行字符串常量(括号内隐式连接);Path 1 续写时追加给 Claude 的提示
"Output token limit hit. Resume directly — " 告知 Claude 之前被截断,要求从断点继续,不重复
"no apology, no recap. Pick up mid-thought." 隐式字符串拼接(两个相邻字符串字面量自动合并)
) 括号闭合;等价于 CONTINUATION_PROMPT = "Output token limit hit. Resume directly — no apology, no recap. Pick up mid-thought."
退避延迟计算示例 BASE_DELAY_MS = 500 attempt=0: min(500*1, 32000)/1000 = 0.5s + jitter attempt=1: min(500*2, 32000)/1000 = 1.0s + jitter attempt=2: min(500*4, 32000)/1000 = 2.0s + jitter attempt=3: min(500*8, 32000)/1000 = 4.0s + jitter attempt=5: min(500*32, 32000)/1000 = 16.0s + jitter attempt=6: min(500*64, 32000)/1000 = 32.0s (上限)
为何需要 FALLBACK_MODEL?
Claude API 429 = 请求超过速率限制(退避可解决);529 = 服务过载(退避通常有效,但严重过载时切换到另一个模型端点成功率更高)。 FALLBACK_MODEL_ID 未配置时,系统继续退避而非崩溃。

§2 RecoveryState — 跨循环状态追踪

逐行注解

代码行语法注解
class RecoveryState: 普通 Python 类(非 dataclass);无需字段声明,所有状态在 __init__ 中赋值
"""Track recovery attempts across the loop.""" 类文档字符串(docstring);位于类体第一行,help(RecoveryState) 可见
def __init__(self): 构造方法;self 指向实例,Python 约定第一参数名为 self
self.has_escalated = False Path 1 标志:是否已将 max_tokens 从 8K 升级到 64K;每个 agent_loop 调用独立
self.recovery_count = 0 Path 1 计数:已发送多少次 continuation prompt;上限 MAX_RECOVERY_RETRIES=3
self.consecutive_529 = 0 Path 3 计数:连续 529 次数;成功后归零(在 with_retry 中)
self.has_attempted_reactive_compact = False Path 2 标志:是否已尝试过 reactive_compact;限制仅重试一次
self.current_model = PRIMARY_MODEL 当前使用的模型 ID;连续 529 时切换为 FALLBACK_MODEL

为何需要状态对象?

agent_loop 是一个 while True 循环,每次迭代可能触发不同类型的错误。如果用局部变量追踪状态,跨迭代传递困难。

将所有恢复状态封装到 RecoveryState 实例中,每次 agent_loop 调用创建一个新实例,确保不同对话轮次相互隔离。

状态生命周期 def agent_loop(messages, context): state = RecoveryState() # 每次调用新建 max_tokens = DEFAULT_MAX_TOKENS # 8000 while True: # state 贯穿整个 while 循环 # state.current_model 可能被 with_retry 修改 # state.has_escalated 控制 Path 1 行为 ...

§3 retry_delay(attempt, retry_after) — 指数退避计算

逐行注解

代码行语法注解
def retry_delay(attempt, retry_after=None): 默认参数 retry_after=None;API 响应头中的 Retry-After 值(秒),优先级最高
if retry_after: 若 API 提供了 Retry-After,直接使用(服务端知道何时可以重试)
return retry_after 早返回;跳过下方计算
base = min(BASE_DELAY_MS * (2 ** attempt), 32000) / 1000 2 ** attempt:指数增长;min(..., 32000):上限 32 秒;/ 1000:毫秒转秒
jitter = random.uniform(0, base * 0.25) random.uniform(a, b):返回 [a, b] 均匀随机浮点数;抖动为 base 的 0~25%;防止多个客户端同时重试(雷群效应)
return base + jitter 返回秒数(浮点);调用方用 time.sleep() 等待
输入 → 输出示例 retry_delay(0) # → ~0.5–0.625s (0.5 + 0~0.125) retry_delay(1) # → ~1.0–1.25s retry_delay(2) # → ~2.0–2.5s retry_delay(5) # → ~16.0–20.0s retry_delay(7) # → ~32.0–40.0s (上限 32s + 最多 8s 抖动) # 服务端提供 Retry-After 时 retry_delay(3, retry_after=60) # → 60 (直接返回,忽略 attempt)
抖动(Jitter)的作用
无抖动时,所有被 429 的客户端会在同一时刻重试,产生再次拥堵。 加入随机抖动后,重试请求在时间上分散,降低对服务端的冲击。 业界常用 "Full Jitter" 或 "Decorrelated Jitter",此处用简化版。

§4 with_retry(fn, state) — 瞬态错误重试包装

逐行注解

代码行语法注解
def with_retry(fn, state: RecoveryState): fn 是可调用对象(callable);调用方传入 lambda,包装 API 调用
for attempt in range(MAX_RETRIES): range(10) = 0..9,最多 10 次尝试;attempt 用于退避计算
try: 每次尝试都包裹在 try/except 中
result = fn() 调用传入的函数(lambda);执行 client.messages.create()
state.consecutive_529 = 0 调用成功:重置 529 计数(连续计数只在失败时递增)
return result 成功后立即返回,跳出 for 循环
except Exception as e: 捕获所有异常;不区分 Exception 类型,通过字符串匹配分类
name = type(e).__name__ type(e):获取异常类型;.__name__:类名字符串,如 "RateLimitError"
msg = str(e).lower() 异常消息转小写,便于大小写不敏感匹配
if "ratelimit" in name.lower() or "429" in msg: 429 检测:匹配类名中的 "ratelimit" 或消息中的 "429"
delay = retry_delay(attempt) 计算本次退避时长
print(f" \033[33m[429 rate limit] retry ...") \033[33m = 黄色 ANSI;打印进度信息
time.sleep(delay) time.sleep(seconds):阻塞当前线程等待
continue continue:跳过本次循环剩余语句,进入下一次 for 迭代
if "overloaded" in name.lower() or "529" in msg or "overloaded" in msg: 529 检测:多关键字覆盖不同 SDK 版本的错误命名差异
state.consecutive_529 += 1 递增连续 529 计数(通过 state 对象跨迭代保持)
if state.consecutive_529 >= MAX_CONSECUTIVE_529: 连续 3 次 529 → 触发模型切换逻辑
if FALLBACK_MODEL: 只在配置了备用模型时切换;否则继续重试主模型
state.current_model = FALLBACK_MODEL 修改 state 中的模型 ID;下次循环的 lambda 会读取新值
state.consecutive_529 = 0 切换后重置计数,给新模型独立计数机会
# Not transient -> re-raise for outer try/except 注释说明:非 429/529 的异常(如网络错误、prompt_too_long)不在此处理
raise 无参数 raise:重新抛出当前异常,保留原始 traceback;让 agent_loop 外层 except 处理
raise RuntimeError(f"Max retries ({MAX_RETRIES}) exceeded") for 循环正常结束(没有 return/break)= 10 次全部失败;主动抛出明确错误

Lambda 包装模式

调用方写法(agent_loop 中) response = with_retry( lambda mt=max_tokens, mdl=state.current_model: client.messages.create( model=mdl, system=system, messages=messages, tools=TOOLS, max_tokens=mt), state)

lambda 默认参数捕获:mt=max_tokens 在 lambda 创建时求值(early binding),而非在调用时(late binding)。若写 lambda: client.messages.create(model=state.current_model,...)state.current_model 每次调用时再读取——这里 mdl 的行为反而是为了在每次重试时重新读取最新模型(通过 state.current_model 作为参数传入)。

注意:此处 mdl=state.current_model 在 lambda 创建时绑定,意味着切换模型后需要创建新 lambda。实际上 with_retry 内部 fn() 调用的是同一个 lambda——模型切换在 下一次 agent_loop 迭代生效。

429 处理完整时序 尝试 0: fn() → 429 → sleep(~0.5s) → continue 尝试 1: fn() → 429 → sleep(~1.0s) → continue 尝试 2: fn() → 成功 → state.consecutive_529 = 0 (已是0,不变) → return response

§5 is_prompt_too_long_error(e) — 错误分类辅助

逐行注解

代码行语法注解
def is_prompt_too_long_error(e: Exception) -> bool: 类型注解;接受任意 Exception,返回布尔值;纯函数,无副作用
msg = str(e).lower() 统一转小写,覆盖不同 SDK 版本的大小写差异
return (("prompt" in msg and "long" in msg) 模式 1:同时含 "prompt" 和 "long"(如 "prompt is too long")
or "prompt_is_too_long" in msg 模式 2:API error code 格式(下划线连接)
or "context_length_exceeded" in msg 模式 3:OpenAI 兼容格式(跨 API 兼容性)
or "max_context_window" in msg) 模式 4:另一种 Claude API 错误消息变体;括号闭合,整体是一个布尔表达式
为何需要多种模式匹配? # Anthropic SDK 可能在不同版本中用不同消息 is_prompt_too_long_error(Exception("prompt is too long")) # → True (模式 1) is_prompt_too_long_error(Exception("prompt_is_too_long")) # → True (模式 2) is_prompt_too_long_error(Exception("context_length_exceeded for model")) # → True (模式 3) is_prompt_too_long_error(Exception("429 rate limit")) # → False (与 429 区分,由 with_retry 处理) is_prompt_too_long_error(Exception("network error")) # → False (不可恢复错误)

§6 reactive_compact(messages) — 紧急消息压缩

逐行注解

代码行语法注解
def reactive_compact(messages: list) -> list: 接受消息列表,返回新的压缩消息列表;不修改原列表(纯函数)
print(" \033[31m[reactive compact] trimming to last 5 messages\033[0m") \033[31m = 红色,提示这是紧急操作
tail = messages[-5:] 切片:保留最后 5 条消息;负索引从末尾计数;若 len < 5 则保留全部
return [{"role": "user", 返回新列表;第一个元素是压缩说明消息,角色为 "user"(Claude 期望 user/assistant 交替)
"content": "[Reactive compact] Earlier conversation trimmed. " 告知 Claude 历史已被截断,避免 Claude 困惑于上下文缺失
"Continue from where you left off."}, *tail] *tail:解包列表,将 tail 的元素展开到外层列表中;结果:[说明消息, msg-N-4, msg-N-3, msg-N-2, msg-N-1, msg-N]

教学简化 vs 生产版本

生产 Claude Code 的 reactive compact 会调用 LLM 生成对话摘要,然后将摘要 + 最近几条消息重组为新列表(s08/s09 中的 compact_history() 模式)。

s11 为了聚焦于错误恢复逻辑,简化为直接保留最后 5 条消息,用说明文字代替摘要。

输入 → 输出 # 输入: 20 条历史消息 messages = [m0, m1, m2, ..., m19] # reactive_compact 后 result = [ {"role":"user","content":"[Reactive compact]..."}, m15, m16, m17, m18, m19 # 最后 5 条 ] # 总计 6 条,大幅降低 token 数

§7 agent_loop — 三路错误处理集成

关键变更代码注解

代码行(s11 新增/变更)说明
state = RecoveryState() 每次 agent_loop 调用创建新状态;多轮对话间相互独立
max_tokens = DEFAULT_MAX_TOKENS 本地变量追踪当前 max_tokens;Path 1 升级时修改
try: 包裹整个 with_retry 调用;捕获 with_retry 传递上来的非瞬态异常
response = with_retry( with_retry 处理 429/529;成功则返回 response;失败则 raise
lambda mt=max_tokens, mdl=state.current_model: lambda 默认参数在创建时绑定当前值;mdl 绑定此时的 current_model
client.messages.create(..., max_tokens=mt), 将 max_tokens 和 model 作为 lambda 参数传入,确保可测试性
except Exception as e: 外层 except:处理 with_retry 未处理的异常(prompt_too_long、不可恢复错误)
if is_prompt_too_long_error(e): Path 2:分类检查
if not state.has_attempted_reactive_compact: 仅重试一次:防止无限 compact 循环
messages[:] = reactive_compact(messages) messages[:] 就地替换列表内容(不改变引用);调用方的 history 列表也同步更新
state.has_attempted_reactive_compact = True 标记已尝试,防止二次触发
continue 跳回 while True 顶部,以压缩后的消息重新发起 LLM 调用
if response.stop_reason == "max_tokens": Path 1:区分 stop_reason 而非异常;被截断时 stop_reason 为 "max_tokens"
if not state.has_escalated: 第一次截断:不追加截断输出,直接升级 token 预算
max_tokens = ESCALATED_MAX_TOKENS 修改本地变量;下一次循环的 lambda 重新创建时会捕获新值
state.has_escalated = True 标记已升级,防止重复升级
continue 用新 max_tokens 重试同一请求
messages.append({"role": "assistant", "content": response.content}) 64K 仍截断:保存截断输出,追加续写提示让 Claude 继续
if state.recovery_count < MAX_RECOVERY_RETRIES: 最多 3 次续写;超过后 return(不强制继续,避免死循环)
messages.append({"role": "user", "content": CONTINUATION_PROMPT}) 注入续写 prompt;下次循环 LLM 会从断点继续

Path 1 完整时序示例

输出截断恢复 # 第 1 次 LLM 调用 response.stop_reason == "max_tokens" has_escalated = False → max_tokens = 64000, has_escalated = True → continue (重试,不追加截断输出) # 第 2 次 LLM 调用 (64K) response.stop_reason == "max_tokens" # 仍截断 has_escalated = True → 追加截断输出 → recovery_count = 0 < 3 → 追加 CONTINUATION_PROMPT → recovery_count = 1, continue # 第 3 次 LLM 调用 response.stop_reason == "end_turn" # 成功 → 正常流程

Path 2 完整时序示例

上下文超限恢复 # 第 1 次调用抛 prompt_too_long is_prompt_too_long_error → True has_attempted_reactive_compact = False → messages[:] = reactive_compact(messages) → has_attempted_reactive_compact = True → continue # 第 2 次调用(压缩后) → 成功(通常消息大幅缩短) # 若第 2 次仍 prompt_too_long has_attempted_reactive_compact = True → 追加错误消息,return(放弃)