S20: Comprehensive Agent — 代码解析

最终章:将 S01-S19 所有教学组件整合为一个完整 Agent Loop。S20 相对 S19 新增:Skill 加载、edit_file/glob/todo_write、Hooks + 权限层、Subagent、上下文压缩系统、错误恢复、Cron 重新整合、plan 审批门控(真实阻塞)、terminal_print、cron_autorun_loop


1. 整体功能概览

01
S20 的定位

S19 已具备 MCP 工具动态扩展能力,但缺少早期章节的核心能力:权限控制、上下文压缩、错误恢复、技能(Skill)加载等。S20 是「综合版本」,将所有 teaching 章节的能力整合到一个文件中,作为学习 Claude Code 架构的完整参考。

02
相对 S19 新增的主要模块

terminal_print:线程安全打印
SKILL_REGISTRY + scan_skills:Skill 文件扫描与加载
run_edit / run_glob / run_todo_write:3 个新工具
HOOKS + permission_hook:钩子系统 + 权限管道
spawn_subagent:独立子代理
上下文压缩系统:tool_result_budget / snip / micro / compact
错误恢复:RecoveryState + with_retry
Cron 重新整合(S14 内容)
cron_autorun_loop:后台 cron 自动运行线程
plan 审批真实门控:protocol_ctx["waiting_plan"]

03
架构层次

S20 的 Agent Loop 有 5 个阶段:
1. 注入 Cron/背景结果
2. 上下文预处理(压缩)
3. LLM 调用(带错误恢复)
4. 工具执行(含权限钩子、背景任务分发)
5. 结果合并回 messages


2. 新增常量

PRIMARY_MODEL = MODEL FALLBACK_MODEL = os.getenv("FALLBACK_MODEL_ID") 错误恢复用:529 过载时切换到备用模型;FALLBACK_MODEL 可为 None
SKILLS_DIR = WORKDIR / "skills" TRANSCRIPT_DIR = WORKDIR / ".transcripts" TOOL_RESULTS_DIR = WORKDIR / ".task_outputs" / "tool-results" 新目录常量;/ 运算符构造 Path;不立即 mkdir(按需创建)
DEFAULT_MAX_TOKENS = 8000 ESCALATED_MAX_TOKENS = 16000 MAX_RETRIES = 3 MAX_CONSECUTIVE_529 = 2 MAX_RECOVERY_RETRIES = 2 错误恢复配置;max_tokens 在 stop_reason=="max_tokens" 时升级到 16000;529过载时最多连续2次后切换模型
CONTEXT_LIMIT = 50000 KEEP_RECENT_TOOL_RESULTS = 3 PERSIST_THRESHOLD = 30000 CONTINUATION_PROMPT = "Continue from the previous response..." PROMPT = "\033[36ms20 >> \033[0m" CLI_ACTIVE = False 压缩阈值;最近保留的 tool result 数;大输出持久化阈值;cli 提示符;CLI_ACTIVE 控制 terminal_print 行为

3. terminal_print — 线程安全控制台输出

新增S19 中多个 teammate 线程直接 print(),会与用户正在输入的 CLI 提示符冲突。S20 引入 terminal_print 解决这个问题。

def terminal_print(text: str): if threading.current_thread() is threading.main_thread() or not CLI_ACTIVE: print(text) return 主线程或非 CLI 模式直接打印;threading.current_thread() 获取当前线程;is 比较对象身份
line = "" if READLINE_AVAILABLE: try: line = readline.get_line_buffer() except Exception: line = "" 保存用户正在输入的内容;readline.get_line_buffer() 返回当前行缓冲区
print(f"\r\033[K{text}") print(PROMPT + line, end="", flush=True) \r 回到行首;\033[K ANSI 清除行到末尾;打印新内容后重新打印 PROMPT + 用户已输入的内容;end="" 不换行;flush=True 立即刷新
示例 — 后台线程打印不破坏 CLI:
用户正在输入: "s20 >> list tas|" (光标在 s 后面)

后台 teammate 发送消息 → terminal_print("[bus] alice → lead: Done")
→ print("\r\033[K[bus] alice → lead: Done") # 清除当前行并打印
→ print("s20 >> list tas", end="", flush=True) # 恢复提示符和已输入内容

用户看到:
[bus] alice → lead: Done
s20 >> list tas| # 光标位置不变

4. Skill 加载系统

新增Skills 是预定义的指令文件(Markdown + YAML frontmatter),Agent 可按需加载。S20 从 S07 迁移此能力。

SKILL_REGISTRY: dict[str, dict] = {} 全局注册表;key = skill 名;value = {name, description, content}
def _parse_frontmatter(text: str) -> tuple[dict, str]: if not text.startswith("---"): return {}, text parts = text.split("---", 2) YAML frontmatter 解析;str.split("---", 2) 最多分成3段;parts[0]="" parts[1]=YAML parts[2]=正文
try: meta = yaml.safe_load(parts[1]) or {} except yaml.YAMLError: meta = {} yaml.safe_load() 安全解析(不执行代码);or {} 防止 None;异常时返回空字典
return meta, parts[2].strip() 返回 (元数据字典, 正文内容)
def scan_skills(): SKILL_REGISTRY.clear() if not SKILLS_DIR.exists(): return for directory in sorted(SKILLS_DIR.iterdir()): if not directory.is_dir(): continue manifest = directory / "SKILL.md" if not manifest.exists(): continue raw = manifest.read_text() meta, _ = _parse_frontmatter(raw) name = meta.get("name", directory.name) desc = meta.get("description", raw.split("\n")[0].lstrip("#").strip()) SKILL_REGISTRY[name] = { "name": name, "description": desc, "content": raw, } 扫描 skills/ 目录;每个子目录一个 skill;SKILL.md 必须存在;name/description 从 frontmatter 读取,有降级策略
Skills 目录结构示例:
skills/
└── code-review/
    └── SKILL.md

SKILL.md 内容:
---
name: code-review
description: Review code for bugs and style
---
# Code Review Instructions
...

5. 三个新增工具:edit_file / glob / todo_write

run_edit — 精确文本替换

def run_edit(path: str, old_text: str, new_text: str, cwd: Path = None) -> str: try: fp = safe_path(path, cwd) text = fp.read_text() if old_text not in text: return f"Error: text not found in {path}" fp.write_text(text.replace(old_text, new_text, 1)) return f"Edited {path}" str.replace(old, new, 1) 只替换第一次出现;第3参数 count=1 防止多处替换;in 操作符检查子串

run_glob — 文件模式匹配

def run_glob(pattern: str, cwd: Path = None) -> str: import glob as g try: base = cwd or WORKDIR results = [] for match in g.glob(pattern, root_dir=base): if (base / match).resolve().is_relative_to(base): results.append(match) return "\n".join(results) if results else "(no matches)" import 在函数内(延迟导入);glob.glob() 内置模块;root_dir 参数(Python 3.10+);安全校验防路径遍历;三元表达式

run_todo_write + _normalize_todos

CURRENT_TODOS: list[dict] = [] 全局 todo 状态;在 agent_loop 外持久存在于整个会话
def _normalize_todos(todos): if isinstance(todos, str): try: todos = json.loads(todos) except json.JSONDecodeError: todos = ast.literal_eval(todos) LLM 有时将 list 序列化为字符串传入;json.loads 失败时用 ast.literal_eval(安全解析 Python 字面量)
for i, todo in enumerate(todos): if "content" not in todo or "status" not in todo: return None, f"Error: todos[{i}] missing..." if todo["status"] not in ("pending","in_progress","completed"): return None, f"Error: ...invalid status..." return todos, None 逐项校验;enumerate() 同时得到 index 和 value;返回 (data, error) 惯用模式
todo_write 示例:
run_todo_write([{"content":"写测试","status":"pending"},{"content":"修bug","status":"in_progress"}])
→ _normalize_todos([...]) → ([...], None)
→ CURRENT_TODOS = [{"content":"写测试","status":"pending"},...]
返回: "Updated 2 todos"

rounds_since_todo 提醒机制:
每次调用非 todo_write 工具 rounds_since_todo += 1
rounds_since_todo >= 3 时,注入 reminder: "<reminder>Update your todos.</reminder>"
调用 todo_write 后 rounds_since_todo = 0 重置

6. Hooks 系统 + 权限管道

新增S19 没有 hooks。S20 引入 UserPromptSubmit / PreToolUse / PostToolUse / Stop 四个事件,允许在不修改工具代码的情况下插入权限检查、日志等横切逻辑。

Hook 注册与触发

HOOKS = {"UserPromptSubmit": [], "PreToolUse": [], "PostToolUse": [], "Stop": []} dict 存储各事件的 callback 列表;初始为空列表
def register_hook(event: str, callback): HOOKS[event].append(callback) list.append();callback 可以是任意 callable(函数、lambda、方法)
def trigger_hooks(event: str, *args): for callback in HOOKS[event]: result = callback(*args) if result is not None: return result return None *args 可变位置参数;callback 返回非 None 时立即返回(短路);None 表示"继续",非 None 表示"拦截/错误"

permission_hook — 权限层

DENY_LIST = ["rm -rf /", "sudo", "shutdown", "reboot", "mkfs", "dd if="] DESTRUCTIVE = ["rm ", "> /etc/", "chmod 777"] 两层策略:DENY_LIST 直接拒绝;DESTRUCTIVE 要求用户确认
def permission_hook(block): if block.name == "bash": command = block.input.get("command", "") for pattern in DENY_LIST: if pattern in command: return f"Permission denied: '{pattern}' is on the deny list" 参数是 tool_use 块;block.name 工具名;in 操作符子串检查;返回错误字符串 = 拦截
if any(token in command for token in DESTRUCTIVE): print(f"\n[permission] destructive command") choice = input(" Allow? [y/N] ").strip().lower() if choice not in ("y", "yes"): return "Permission denied by user" any() + generator;交互确认;input() 阻塞等待用户;.strip().lower() 规范化输入
if block.name.startswith("mcp__") and "deploy" in block.name: ... return "Permission denied by user" MCP 破坏性工具也需确认;str.startswith() + in 双重检查
register_hook("UserPromptSubmit", user_prompt_hook) register_hook("PreToolUse", permission_hook) register_hook("PreToolUse", log_hook) register_hook("PostToolUse", large_output_hook) register_hook("Stop", stop_hook) 模块级注册;PreToolUse 有两个 hook:permission(可拦截)先于 log(仅打印);多个 hook 按注册顺序执行

7. spawn_subagent — 独立子代理

新增S19 的 spawn_teammate 启动长期存在的背景线程。S20 的 spawn_subagent 是一次性的、同步的子代理,完成任务后返回文本摘要。用于需要隔离上下文的复杂子任务。

SUB_SYSTEM = ( f"You are a coding subagent at {WORKDIR}. " "Complete the task, then return a concise final summary. " "Do not spawn more agents." ) f-string 含 WORKDIR;括号内隐式字符串拼接;明确禁止递归生成子代理
def has_tool_use(content) -> bool: return any(getattr(block, "type", None) == "tool_use" for block in content) any() + generator;getattr() 兼容 dict 和对象两种格式;stop_reason 之外的继续判断标准
def spawn_subagent(description: str) -> str: messages = [{"role": "user", "content": description}] for _ in range(30): 最多 30 轮;description = 任务描述
response = client.messages.create( model=MODEL, system=SUB_SYSTEM, messages=messages, tools=SUB_TOOLS, max_tokens=8000) 子代理用独立的 SUB_TOOLS 集合(bash/read/write/edit/glob,无 teammate/MCP 工具)
messages.append({"role": "assistant", "content": response.content}) if not has_tool_use(response.content): break 无 tool_use = 子代理完成,退出循环
blocked = trigger_hooks("PreToolUse", block) if blocked: output = str(blocked) else: handler = SUB_HANDLERS.get(block.name) output = call_tool_handler(handler, block.input, block.name) trigger_hooks("PostToolUse", block, output) 子代理也走 hooks!permission_hook 同样生效;保证子代理执行的命令也受权限控制
for msg in reversed(messages): if msg["role"] == "assistant": text = extract_text(msg["content"]) if text: return text return "Subagent finished without a text summary." 从最近的 assistant 消息提取文本;reversed() 从尾部开始;extract_text 处理 block 列表

8. 上下文压缩系统

新增四层压缩策略,按需叠加,防止 context window 溢出。

四层策略概览

def tool_result_budget(messages, max_bytes=200_000): 第1层:超大 tool result 持久化到磁盘,用预览替换
def snip_compact(messages, max_messages=50): 第2层:超过50条消息时截断中间部分,插入 [snipped N messages]
def micro_compact(messages): 第3层:只保留最近3个 tool result 的完整内容,其余截短
def compact_history(messages): 第4层(重量级):LLM 摘要整个历史,压缩为单条消息

prepare_context — 统一入口

def prepare_context(messages: list) -> list: messages[:] = tool_result_budget(messages) messages[:] = snip_compact(messages) messages[:] = micro_compact(messages) if estimate_size(messages) > CONTEXT_LIMIT: messages[:] = compact_history(messages) return messages messages[:] = ... 就地替换列表内容(不改变引用);按顺序执行各层;只有前三层不够时才调 LLM 做摘要(成本高)
persist_large_output 示例:
tool_use_id = "toolu_abc123"
output = "..." (40000 字符,超过 PERSIST_THRESHOLD=30000)

→ path = .task_outputs/tool-results/toolu_abc123.txt
→ path.write_text(output)
→ 返回 "<persisted-output>\nFull output: .task_outputs/.../toolu_abc123.txt\nPreview:\n{output[:2000]}\n</persisted-output>"

LLM 看到预览 + 路径,可按需调 read_file 读取完整内容

snip_compact — 中间截断(工具对保护)

def snip_compact(messages: list, max_messages: int = 50) -> list: if len(messages) <= max_messages: return messages head_end, tail_start = 3, len(messages) - (max_messages - 3) 保留头3条 + 尾47条(共50条);head_end = 3,tail_start = len-47
if head_end > 0 and message_has_tool_use(messages[head_end - 1]): while head_end < len(messages) and is_tool_result_message(messages[head_end]): head_end += 1 保护工具对:如果 head[2] 是 tool_use,则延伸 head 直到包含对应的 tool_result;防止孤立的 tool_use 块
snipped = tail_start - head_end return (messages[:head_end] + [{"role": "user", "content": f"[snipped {snipped} messages]"}] + messages[tail_start:]) 列表切片拼接;中间插入占位符消息;snipped = 实际截断条数

9. 错误恢复 — RecoveryState + with_retry

新增S19 遇到 API 错误直接返回错误消息。S20 引入完整的错误恢复状态机。

RecoveryState 状态机

class RecoveryState: def __init__(self): self.has_escalated = False self.recovery_count = 0 self.consecutive_529 = 0 self.has_attempted_reactive_compact = False self.current_model = PRIMARY_MODEL 每次 agent_loop 调用创建一个新实例;has_escalated = 是否已升级到 16000 tokens;consecutive_529 = 连续过载次数;current_model 动态切换
def retry_delay(attempt: int) -> float: base = min(BASE_DELAY_MS * (2 ** attempt), 32000) / 1000 return base + random.uniform(0, base * 0.25) 指数退避:0.5s, 1s, 2s,...;min() 上限 32s;random.uniform() 加 jitter 防止多实例同步重试

with_retry — 重试逻辑

def with_retry(fn, state: RecoveryState): for attempt in range(MAX_RETRIES): try: result = fn() state.consecutive_529 = 0 return result fn 是 lambda:lambda: client.messages.create(...);成功后重置 529 计数
except Exception as e: name = type(e).__name__.lower() msg = str(e).lower() if "ratelimit" in name or "429" in msg: delay = retry_delay(attempt) time.sleep(delay) continue 429 速率限制:退避重试;type(e).__name__ 获取异常类名;str(e) 获取消息;.lower() 不区分大小写
if "overloaded" in name or "529" in msg or "overloaded" in msg: state.consecutive_529 += 1 if state.consecutive_529 >= MAX_CONSECUTIVE_529 and FALLBACK_MODEL: state.current_model = FALLBACK_MODEL state.consecutive_529 = 0 529 过载:计数;连续2次后切换到 FALLBACK_MODEL(如果配置了)
raise RuntimeError(f"Max retries ({MAX_RETRIES}) exceeded") 所有重试都失败时 raise;调用方的 except 捕获

10. S20 Agent Loop — 综合版本

S20 的 agent_loop 整合了 S01-S19 的所有机制。以下展示关键新增逻辑(S19 已有的部分不重复)。

def agent_loop(messages: list, context: dict): global rounds_since_todo tools, handlers = assemble_tool_pool() state = RecoveryState() max_tokens = DEFAULT_MAX_TOKENS global 声明修改模块级变量;RecoveryState 每次 loop 新建;max_tokens 动态调整
while True: fired = consume_cron_queue() for job in fired: messages.append({"role": "user", "content": f"[Scheduled] {job.prompt}"}) 每轮先注入 cron 任务;consume_cron_queue() 线程安全消费
inject_background_notifications(messages) 注入已完成背景任务的通知
if rounds_since_todo >= 3: messages.append({"role": "user", "content": "<reminder>Update your todos.</reminder>"}) 3轮未更新 todo 则提醒;XML 标签帮 LLM 识别这是系统提醒
prepare_context(messages) 四层压缩
try: response = call_llm(messages, context, tools, state, max_tokens) except Exception as e: if is_prompt_too_long_error(e) and not state.has_attempted_reactive_compact: messages[:] = reactive_compact(messages) state.has_attempted_reactive_compact = True continue prompt_too_long → 响应式压缩(保留最近5条);只尝试一次;continue 重新进入循环
if response.stop_reason == "max_tokens": if not state.has_escalated: max_tokens = ESCALATED_MAX_TOKENS state.has_escalated = True continue messages.append({"role": "assistant", "content": response.content}) if state.recovery_count < MAX_RECOVERY_RETRIES: messages.append({"role": "user", "content": CONTINUATION_PROMPT}) state.recovery_count += 1 continue max_tokens 截断:先升级到 16000 重试;再升级后仍截断:注入 CONTINUATION_PROMPT 让 LLM 继续;最多重试 MAX_RECOVERY_RETRIES=2 次
for block in response.content: if block.name == "compact": messages[:] = compact_history(messages) messages.append({"role": "user", "content": "[Compacted. Continue with summarized context.]"}) compacted_now = True break LLM 主动调用 compact 工具时立即压缩并 break;compacted_now 标志跳过 append results(已重置 messages)
blocked = trigger_hooks("PreToolUse", block) if blocked: results.append({"type": "tool_result", "tool_use_id": block.id, "content": str(blocked)}) continue 权限拦截:返回拦截原因作为 tool_result;LLM 看到拒绝原因后可以调整策略

11. cron_autorun_loop — 自动运行后台线程

新增S19 无 cron。S20 重新整合 cron,并新增 cron_autorun_loop:在用户不输入时也能自动执行到期的 cron 任务。

def cron_autorun_loop(history: list, context: dict): while True: time.sleep(1) fired = consume_cron_queue() if not fired: continue 1s 轮询;consume_cron_queue() 线程安全;无 fired 时 continue(不触发 LLM)
with agent_lock: turn_start = len(history) for job in fired: history.append({"role": "user", "content": f"[Scheduled] {job.prompt}"}) terminal_print(f" [cron auto] {job.prompt[:60]}") agent_loop(history, context) context.update(update_context(context, history)) print_turn_assistants(history, turn_start) agent_lock 互斥锁防止 cron 和用户输入同时调用 agent_loop;turn_start 记录本次 loop 开始位置;context.update() 就地更新
并发安全:agent_lock
agent_lock = threading.Lock() # 模块级

主线程(用户输入):
with agent_lock:
  agent_loop(history, context)

后台线程(cron_autorun_loop):
with agent_lock:
  agent_loop(history, context)

Lock 保证同一时间只有一个线程在运行 agent_loop,防止 history 被并发修改。

12. Plan 审批门控 — 真实阻塞实现

改进S16 的 plan 审批是「协议级」的(不阻塞工具执行)。S20 实现了真实的代码级门控:submit_plan 后 teammate 停止 LLM 迭代,直到收到 approval。

protocol_ctx = {"waiting_plan": None} 字典作为可变状态容器;waiting_plan = 当前等待审批的 req_id,None = 无等待
if msg_type == "plan_approval_response": approve = meta.get("approve", False) if req_id == protocol_ctx["waiting_plan"]: protocol_ctx["waiting_plan"] = None 收到审批响应时,检查 req_id 匹配;匹配则解锁门控(waiting_plan = None)
if protocol_ctx["waiting_plan"]: # Poll only for protocol replies while the approval gate is # closed; do not let the model continue with the task. time.sleep(IDLE_POLL_INTERVAL) continue 门控关闭时:跳过 LLM 调用,只轮询 inbox;这是真实的代码级阻塞(S16 没有这个)
if block.name == "submit_plan": output = _teammate_submit_plan( name, block.input.get("plan", "")) match = re.search(r"\((req_\d+)\)", output) protocol_ctx["waiting_plan"] = ( match.group(1) if match else output) 提交 plan 后提取 req_id(从返回字符串"Plan submitted (req_042871)");设置 waiting_plan = req_id;re.search() 正则提取
if protocol_ctx["waiting_plan"]: # Ignore later tool_use blocks from the same model # response; they belong after approval, not before. break 同一次响应中 submit_plan 之后的其他 tool_use 全部忽略;只处理 submit_plan 的结果,break 退出 for 循环
Plan 审批门控完整流程: Teammate thread: [WORK 阶段] LLM → tool_use {name:"submit_plan", plan:"我计划..."} → _teammate_submit_plan("alice","我计划...") → "Plan submitted (req_042871)" → protocol_ctx["waiting_plan"] = "req_042871" → break (忽略同轮其他 tool_use) [GATE CLOSED: waiting_plan = "req_042871"] 下一轮 for _ in range(10): if protocol_ctx["waiting_plan"]: time.sleep(5) continue <-- LLM 不被调用! time.sleep(5) continue <-- 一直等待,直到 inbox 收到 approval 收到 plan_approval_response (approve=True, req_id=req_042871): protocol_ctx["waiting_plan"] = None (解锁) messages.append("[Plan approved] Proceed") [GATE OPEN] 下一轮 for loop: waiting_plan=None → 正常 LLM 调用 LLM 看到 "[Plan approved] Proceed" → 继续执行任务