S20: Comprehensive Agent — 代码解析
最终章:将 S01-S19 所有教学组件整合为一个完整 Agent Loop。S20 相对 S19 新增:Skill 加载、edit_file/glob/todo_write、Hooks + 权限层、Subagent、上下文压缩系统、错误恢复、Cron 重新整合、plan 审批门控(真实阻塞)、terminal_print、cron_autorun_loop
1. 整体功能概览
01
S20 的定位
S19 已具备 MCP 工具动态扩展能力,但缺少早期章节的核心能力:权限控制、上下文压缩、错误恢复、技能(Skill)加载等。S20 是「综合版本」,将所有 teaching 章节的能力整合到一个文件中,作为学习 Claude Code 架构的完整参考。
02
相对 S19 新增的主要模块
• terminal_print:线程安全打印
• SKILL_REGISTRY + scan_skills:Skill 文件扫描与加载
• run_edit / run_glob / run_todo_write:3 个新工具
• HOOKS + permission_hook:钩子系统 + 权限管道
• spawn_subagent:独立子代理
• 上下文压缩系统:tool_result_budget / snip / micro / compact
• 错误恢复:RecoveryState + with_retry
• Cron 重新整合(S14 内容)
• cron_autorun_loop:后台 cron 自动运行线程
• plan 审批真实门控:protocol_ctx["waiting_plan"]
03
架构层次
S20 的 Agent Loop 有 5 个阶段:
1. 注入 Cron/背景结果
2. 上下文预处理(压缩)
3. LLM 调用(带错误恢复)
4. 工具执行(含权限钩子、背景任务分发)
5. 结果合并回 messages
2. 新增常量
| PRIMARY_MODEL = MODEL
FALLBACK_MODEL = os.getenv("FALLBACK_MODEL_ID") |
错误恢复用:529 过载时切换到备用模型;FALLBACK_MODEL 可为 None |
| SKILLS_DIR = WORKDIR / "skills"
TRANSCRIPT_DIR = WORKDIR / ".transcripts"
TOOL_RESULTS_DIR = WORKDIR / ".task_outputs" / "tool-results" |
新目录常量;/ 运算符构造 Path;不立即 mkdir(按需创建) |
| DEFAULT_MAX_TOKENS = 8000
ESCALATED_MAX_TOKENS = 16000
MAX_RETRIES = 3
MAX_CONSECUTIVE_529 = 2
MAX_RECOVERY_RETRIES = 2 |
错误恢复配置;max_tokens 在 stop_reason=="max_tokens" 时升级到 16000;529过载时最多连续2次后切换模型 |
| CONTEXT_LIMIT = 50000
KEEP_RECENT_TOOL_RESULTS = 3
PERSIST_THRESHOLD = 30000
CONTINUATION_PROMPT = "Continue from the previous response..."
PROMPT = "\033[36ms20 >> \033[0m"
CLI_ACTIVE = False |
压缩阈值;最近保留的 tool result 数;大输出持久化阈值;cli 提示符;CLI_ACTIVE 控制 terminal_print 行为 |
3. terminal_print — 线程安全控制台输出
新增S19 中多个 teammate 线程直接 print(),会与用户正在输入的 CLI 提示符冲突。S20 引入 terminal_print 解决这个问题。
| def terminal_print(text: str):
if threading.current_thread() is threading.main_thread() or not CLI_ACTIVE:
print(text)
return |
主线程或非 CLI 模式直接打印;threading.current_thread() 获取当前线程;is 比较对象身份 |
| line = ""
if READLINE_AVAILABLE:
try:
line = readline.get_line_buffer()
except Exception:
line = "" |
保存用户正在输入的内容;readline.get_line_buffer() 返回当前行缓冲区 |
| print(f"\r\033[K{text}")
print(PROMPT + line, end="", flush=True) |
\r 回到行首;\033[K ANSI 清除行到末尾;打印新内容后重新打印 PROMPT + 用户已输入的内容;end="" 不换行;flush=True 立即刷新 |
示例 — 后台线程打印不破坏 CLI:
用户正在输入: "s20 >> list tas|" (光标在 s 后面)
后台 teammate 发送消息 → terminal_print("[bus] alice → lead: Done")
→ print("\r\033[K[bus] alice → lead: Done")
→ print("s20 >> list tas", end="", flush=True)
用户看到:
[bus] alice → lead: Done
s20 >> list tas|
4. Skill 加载系统
新增Skills 是预定义的指令文件(Markdown + YAML frontmatter),Agent 可按需加载。S20 从 S07 迁移此能力。
| SKILL_REGISTRY: dict[str, dict] = {} |
全局注册表;key = skill 名;value = {name, description, content} |
| def _parse_frontmatter(text: str) -> tuple[dict, str]:
if not text.startswith("---"):
return {}, text
parts = text.split("---", 2) |
YAML frontmatter 解析;str.split("---", 2) 最多分成3段;parts[0]="" parts[1]=YAML parts[2]=正文 |
| try:
meta = yaml.safe_load(parts[1]) or {}
except yaml.YAMLError:
meta = {} |
yaml.safe_load() 安全解析(不执行代码);or {} 防止 None;异常时返回空字典 |
| return meta, parts[2].strip() |
返回 (元数据字典, 正文内容) |
| def scan_skills():
SKILL_REGISTRY.clear()
if not SKILLS_DIR.exists():
return
for directory in sorted(SKILLS_DIR.iterdir()):
if not directory.is_dir():
continue
manifest = directory / "SKILL.md"
if not manifest.exists():
continue
raw = manifest.read_text()
meta, _ = _parse_frontmatter(raw)
name = meta.get("name", directory.name)
desc = meta.get("description", raw.split("\n")[0].lstrip("#").strip())
SKILL_REGISTRY[name] = {
"name": name,
"description": desc,
"content": raw,
} |
扫描 skills/ 目录;每个子目录一个 skill;SKILL.md 必须存在;name/description 从 frontmatter 读取,有降级策略 |
Skills 目录结构示例:
skills/
└── code-review/
└── SKILL.md
SKILL.md 内容:
---
name: code-review
description: Review code for bugs and style
---
# Code Review Instructions
...
6. Hooks 系统 + 权限管道
新增S19 没有 hooks。S20 引入 UserPromptSubmit / PreToolUse / PostToolUse / Stop 四个事件,允许在不修改工具代码的情况下插入权限检查、日志等横切逻辑。
Hook 注册与触发
| HOOKS = {"UserPromptSubmit": [], "PreToolUse": [],
"PostToolUse": [], "Stop": []} |
dict 存储各事件的 callback 列表;初始为空列表 |
| def register_hook(event: str, callback):
HOOKS[event].append(callback) |
list.append();callback 可以是任意 callable(函数、lambda、方法) |
| def trigger_hooks(event: str, *args):
for callback in HOOKS[event]:
result = callback(*args)
if result is not None:
return result
return None |
*args 可变位置参数;callback 返回非 None 时立即返回(短路);None 表示"继续",非 None 表示"拦截/错误" |
permission_hook — 权限层
| DENY_LIST = ["rm -rf /", "sudo", "shutdown", "reboot", "mkfs", "dd if="]
DESTRUCTIVE = ["rm ", "> /etc/", "chmod 777"] |
两层策略:DENY_LIST 直接拒绝;DESTRUCTIVE 要求用户确认 |
| def permission_hook(block):
if block.name == "bash":
command = block.input.get("command", "")
for pattern in DENY_LIST:
if pattern in command:
return f"Permission denied: '{pattern}' is on the deny list" |
参数是 tool_use 块;block.name 工具名;in 操作符子串检查;返回错误字符串 = 拦截 |
| if any(token in command for token in DESTRUCTIVE):
print(f"\n[permission] destructive command")
choice = input(" Allow? [y/N] ").strip().lower()
if choice not in ("y", "yes"):
return "Permission denied by user" |
any() + generator;交互确认;input() 阻塞等待用户;.strip().lower() 规范化输入 |
| if block.name.startswith("mcp__") and "deploy" in block.name:
...
return "Permission denied by user" |
MCP 破坏性工具也需确认;str.startswith() + in 双重检查 |
| register_hook("UserPromptSubmit", user_prompt_hook)
register_hook("PreToolUse", permission_hook)
register_hook("PreToolUse", log_hook)
register_hook("PostToolUse", large_output_hook)
register_hook("Stop", stop_hook) |
模块级注册;PreToolUse 有两个 hook:permission(可拦截)先于 log(仅打印);多个 hook 按注册顺序执行 |
7. spawn_subagent — 独立子代理
新增S19 的 spawn_teammate 启动长期存在的背景线程。S20 的 spawn_subagent 是一次性的、同步的子代理,完成任务后返回文本摘要。用于需要隔离上下文的复杂子任务。
| SUB_SYSTEM = (
f"You are a coding subagent at {WORKDIR}. "
"Complete the task, then return a concise final summary. "
"Do not spawn more agents."
) |
f-string 含 WORKDIR;括号内隐式字符串拼接;明确禁止递归生成子代理 |
| def has_tool_use(content) -> bool:
return any(getattr(block, "type", None) == "tool_use"
for block in content) |
any() + generator;getattr() 兼容 dict 和对象两种格式;stop_reason 之外的继续判断标准 |
| def spawn_subagent(description: str) -> str:
messages = [{"role": "user", "content": description}]
for _ in range(30): |
最多 30 轮;description = 任务描述 |
| response = client.messages.create(
model=MODEL, system=SUB_SYSTEM, messages=messages,
tools=SUB_TOOLS, max_tokens=8000) |
子代理用独立的 SUB_TOOLS 集合(bash/read/write/edit/glob,无 teammate/MCP 工具) |
| messages.append({"role": "assistant", "content": response.content})
if not has_tool_use(response.content):
break |
无 tool_use = 子代理完成,退出循环 |
| blocked = trigger_hooks("PreToolUse", block)
if blocked:
output = str(blocked)
else:
handler = SUB_HANDLERS.get(block.name)
output = call_tool_handler(handler, block.input, block.name)
trigger_hooks("PostToolUse", block, output) |
子代理也走 hooks!permission_hook 同样生效;保证子代理执行的命令也受权限控制 |
| for msg in reversed(messages):
if msg["role"] == "assistant":
text = extract_text(msg["content"])
if text:
return text
return "Subagent finished without a text summary." |
从最近的 assistant 消息提取文本;reversed() 从尾部开始;extract_text 处理 block 列表 |
8. 上下文压缩系统
新增四层压缩策略,按需叠加,防止 context window 溢出。
四层策略概览
| def tool_result_budget(messages, max_bytes=200_000): |
第1层:超大 tool result 持久化到磁盘,用预览替换 |
| def snip_compact(messages, max_messages=50): |
第2层:超过50条消息时截断中间部分,插入 [snipped N messages] |
| def micro_compact(messages): |
第3层:只保留最近3个 tool result 的完整内容,其余截短 |
| def compact_history(messages): |
第4层(重量级):LLM 摘要整个历史,压缩为单条消息 |
prepare_context — 统一入口
| def prepare_context(messages: list) -> list:
messages[:] = tool_result_budget(messages)
messages[:] = snip_compact(messages)
messages[:] = micro_compact(messages)
if estimate_size(messages) > CONTEXT_LIMIT:
messages[:] = compact_history(messages)
return messages |
messages[:] = ... 就地替换列表内容(不改变引用);按顺序执行各层;只有前三层不够时才调 LLM 做摘要(成本高) |
persist_large_output 示例:
tool_use_id = "toolu_abc123"
output = "..." (40000 字符,超过 PERSIST_THRESHOLD=30000)
→ path = .task_outputs/tool-results/toolu_abc123.txt
→ path.write_text(output)
→ 返回 "<persisted-output>\nFull output: .task_outputs/.../toolu_abc123.txt\nPreview:\n{output[:2000]}\n</persisted-output>"
LLM 看到预览 + 路径,可按需调 read_file 读取完整内容
snip_compact — 中间截断(工具对保护)
| def snip_compact(messages: list, max_messages: int = 50) -> list:
if len(messages) <= max_messages:
return messages
head_end, tail_start = 3, len(messages) - (max_messages - 3) |
保留头3条 + 尾47条(共50条);head_end = 3,tail_start = len-47 |
| if head_end > 0 and message_has_tool_use(messages[head_end - 1]):
while head_end < len(messages) and is_tool_result_message(messages[head_end]):
head_end += 1 |
保护工具对:如果 head[2] 是 tool_use,则延伸 head 直到包含对应的 tool_result;防止孤立的 tool_use 块 |
| snipped = tail_start - head_end
return (messages[:head_end]
+ [{"role": "user", "content": f"[snipped {snipped} messages]"}]
+ messages[tail_start:]) |
列表切片拼接;中间插入占位符消息;snipped = 实际截断条数 |
9. 错误恢复 — RecoveryState + with_retry
新增S19 遇到 API 错误直接返回错误消息。S20 引入完整的错误恢复状态机。
RecoveryState 状态机
| class RecoveryState:
def __init__(self):
self.has_escalated = False
self.recovery_count = 0
self.consecutive_529 = 0
self.has_attempted_reactive_compact = False
self.current_model = PRIMARY_MODEL |
每次 agent_loop 调用创建一个新实例;has_escalated = 是否已升级到 16000 tokens;consecutive_529 = 连续过载次数;current_model 动态切换 |
| def retry_delay(attempt: int) -> float:
base = min(BASE_DELAY_MS * (2 ** attempt), 32000) / 1000
return base + random.uniform(0, base * 0.25) |
指数退避:0.5s, 1s, 2s,...;min() 上限 32s;random.uniform() 加 jitter 防止多实例同步重试 |
with_retry — 重试逻辑
| def with_retry(fn, state: RecoveryState):
for attempt in range(MAX_RETRIES):
try:
result = fn()
state.consecutive_529 = 0
return result |
fn 是 lambda:lambda: client.messages.create(...);成功后重置 529 计数 |
| except Exception as e:
name = type(e).__name__.lower()
msg = str(e).lower()
if "ratelimit" in name or "429" in msg:
delay = retry_delay(attempt)
time.sleep(delay)
continue |
429 速率限制:退避重试;type(e).__name__ 获取异常类名;str(e) 获取消息;.lower() 不区分大小写 |
| if "overloaded" in name or "529" in msg or "overloaded" in msg:
state.consecutive_529 += 1
if state.consecutive_529 >= MAX_CONSECUTIVE_529 and FALLBACK_MODEL:
state.current_model = FALLBACK_MODEL
state.consecutive_529 = 0 |
529 过载:计数;连续2次后切换到 FALLBACK_MODEL(如果配置了) |
| raise RuntimeError(f"Max retries ({MAX_RETRIES}) exceeded") |
所有重试都失败时 raise;调用方的 except 捕获 |
10. S20 Agent Loop — 综合版本
S20 的 agent_loop 整合了 S01-S19 的所有机制。以下展示关键新增逻辑(S19 已有的部分不重复)。
| def agent_loop(messages: list, context: dict):
global rounds_since_todo
tools, handlers = assemble_tool_pool()
state = RecoveryState()
max_tokens = DEFAULT_MAX_TOKENS |
global 声明修改模块级变量;RecoveryState 每次 loop 新建;max_tokens 动态调整 |
| while True:
fired = consume_cron_queue()
for job in fired:
messages.append({"role": "user", "content": f"[Scheduled] {job.prompt}"}) |
每轮先注入 cron 任务;consume_cron_queue() 线程安全消费 |
| inject_background_notifications(messages) |
注入已完成背景任务的通知 |
| if rounds_since_todo >= 3:
messages.append({"role": "user",
"content": "<reminder>Update your todos.</reminder>"}) |
3轮未更新 todo 则提醒;XML 标签帮 LLM 识别这是系统提醒 |
| prepare_context(messages) |
四层压缩 |
| try:
response = call_llm(messages, context, tools, state, max_tokens)
except Exception as e:
if is_prompt_too_long_error(e) and not state.has_attempted_reactive_compact:
messages[:] = reactive_compact(messages)
state.has_attempted_reactive_compact = True
continue |
prompt_too_long → 响应式压缩(保留最近5条);只尝试一次;continue 重新进入循环 |
| if response.stop_reason == "max_tokens":
if not state.has_escalated:
max_tokens = ESCALATED_MAX_TOKENS
state.has_escalated = True
continue
messages.append({"role": "assistant", "content": response.content})
if state.recovery_count < MAX_RECOVERY_RETRIES:
messages.append({"role": "user", "content": CONTINUATION_PROMPT})
state.recovery_count += 1
continue |
max_tokens 截断:先升级到 16000 重试;再升级后仍截断:注入 CONTINUATION_PROMPT 让 LLM 继续;最多重试 MAX_RECOVERY_RETRIES=2 次 |
| for block in response.content:
if block.name == "compact":
messages[:] = compact_history(messages)
messages.append({"role": "user",
"content": "[Compacted. Continue with summarized context.]"})
compacted_now = True
break |
LLM 主动调用 compact 工具时立即压缩并 break;compacted_now 标志跳过 append results(已重置 messages) |
| blocked = trigger_hooks("PreToolUse", block)
if blocked:
results.append({"type": "tool_result",
"tool_use_id": block.id,
"content": str(blocked)})
continue |
权限拦截:返回拦截原因作为 tool_result;LLM 看到拒绝原因后可以调整策略 |
11. cron_autorun_loop — 自动运行后台线程
新增S19 无 cron。S20 重新整合 cron,并新增 cron_autorun_loop:在用户不输入时也能自动执行到期的 cron 任务。
| def cron_autorun_loop(history: list, context: dict):
while True:
time.sleep(1)
fired = consume_cron_queue()
if not fired:
continue |
1s 轮询;consume_cron_queue() 线程安全;无 fired 时 continue(不触发 LLM) |
| with agent_lock:
turn_start = len(history)
for job in fired:
history.append({"role": "user",
"content": f"[Scheduled] {job.prompt}"})
terminal_print(f" [cron auto] {job.prompt[:60]}")
agent_loop(history, context)
context.update(update_context(context, history))
print_turn_assistants(history, turn_start) |
agent_lock 互斥锁防止 cron 和用户输入同时调用 agent_loop;turn_start 记录本次 loop 开始位置;context.update() 就地更新 |
并发安全:agent_lock
agent_lock = threading.Lock()
主线程(用户输入):
with agent_lock:
agent_loop(history, context)
后台线程(cron_autorun_loop):
with agent_lock:
agent_loop(history, context)
Lock 保证同一时间只有一个线程在运行 agent_loop,防止 history 被并发修改。
12. Plan 审批门控 — 真实阻塞实现
改进S16 的 plan 审批是「协议级」的(不阻塞工具执行)。S20 实现了真实的代码级门控:submit_plan 后 teammate 停止 LLM 迭代,直到收到 approval。
| protocol_ctx = {"waiting_plan": None} |
字典作为可变状态容器;waiting_plan = 当前等待审批的 req_id,None = 无等待 |
| if msg_type == "plan_approval_response":
approve = meta.get("approve", False)
if req_id == protocol_ctx["waiting_plan"]:
protocol_ctx["waiting_plan"] = None |
收到审批响应时,检查 req_id 匹配;匹配则解锁门控(waiting_plan = None) |
| if protocol_ctx["waiting_plan"]:
# Poll only for protocol replies while the approval gate is
# closed; do not let the model continue with the task.
time.sleep(IDLE_POLL_INTERVAL)
continue |
门控关闭时:跳过 LLM 调用,只轮询 inbox;这是真实的代码级阻塞(S16 没有这个) |
| if block.name == "submit_plan":
output = _teammate_submit_plan(
name, block.input.get("plan", ""))
match = re.search(r"\((req_\d+)\)", output)
protocol_ctx["waiting_plan"] = (
match.group(1) if match else output) |
提交 plan 后提取 req_id(从返回字符串"Plan submitted (req_042871)");设置 waiting_plan = req_id;re.search() 正则提取 |
| if protocol_ctx["waiting_plan"]:
# Ignore later tool_use blocks from the same model
# response; they belong after approval, not before.
break |
同一次响应中 submit_plan 之后的其他 tool_use 全部忽略;只处理 submit_plan 的结果,break 退出 for 循环 |
Plan 审批门控完整流程:
Teammate thread:
[WORK 阶段]
LLM → tool_use {name:"submit_plan", plan:"我计划..."}
→ _teammate_submit_plan("alice","我计划...") → "Plan submitted (req_042871)"
→ protocol_ctx["waiting_plan"] = "req_042871"
→ break (忽略同轮其他 tool_use)
[GATE CLOSED: waiting_plan = "req_042871"]
下一轮 for _ in range(10):
if protocol_ctx["waiting_plan"]:
time.sleep(5)
continue <-- LLM 不被调用!
time.sleep(5)
continue <-- 一直等待,直到 inbox 收到 approval
收到 plan_approval_response (approve=True, req_id=req_042871):
protocol_ctx["waiting_plan"] = None (解锁)
messages.append("[Plan approved] Proceed")
[GATE OPEN]
下一轮 for loop: waiting_plan=None → 正常 LLM 调用
LLM 看到 "[Plan approved] Proceed" → 继续执行任务