1. 整体功能 — 这段代码解决什么问题

问题背景

S12 的 agent loop 是同步的:每次工具调用都阻塞主线程等待结果。当用户要求执行 npm installpytest 这类耗时几分钟的命令时,agent 和用户都被卡住,无法继续对话。

S13 的解决方案

引入 threading.Thread 将慢操作派发到后台线程,主线程立即返回一个占位符给 LLM。后台线程完成后,结果通过 <task_notification> 格式注入到下一轮对话中。

在系统中的角色

S13 是 agent 系统从"同步工具执行"升级到"异步任务调度"的关键一步,为后续 S14 的 cron scheduler(独立调度线程)以及 S15 的 teammate 线程奠定基础。


2. import 新增

与 S12 的差异

S12 的 import 行是 import os, subprocess, json, time, random,S13 新增了 threading 模块。

import os, subprocess, json, time, random, threading threading:Python 标准库线程模块。提供 Thread、Lock 等并发原语。S13 所有后台执行逻辑都依赖此模块。
3. 全局状态变量(S13 新增)

设计意图

用三个模块级变量追踪所有后台任务的生命周期:计数器产生唯一 ID、两个 dict 分别存元数据和结果、Lock 保证多线程安全。

_bg_counter = 0 模块级整数计数器。每次派发新后台任务时递增,生成唯一 ID(如 bg_0001)。前缀 _ 表示模块私有。
background_tasks: dict[str, dict] = {} 类型注解dict[str, dict] 是 Python 3.9+ 泛型写法。键为 bg_id,值为包含 tool_use_idcommandstatus 的 dict。存储任务元数据。
background_results: dict[str, str] = {} 键为 bg_id,值为工具执行的字符串输出。与 background_tasks 分开存放,是为了在 collect 时可以一起原子删除(先 pop 元数据再 pop 结果)。
background_lock = threading.Lock() threading.Lock():互斥锁。同一时刻只有一个线程可以持有它。所有对 background_tasks / background_results 的读写都必须在 with background_lock: 块内进行,防止竞态条件(race condition)。
内存结构示例background_tasks = { "bg_0001": { "tool_use_id": "toolu_abc123", # 对应 LLM 请求里的 block.id "command": "npm install", "status": "running" # running → completed } } background_results = { "bg_0001": "added 142 packages in 23s" # 完成后写入 }
4. is_slow_operation() — 慢操作启发式判断

功能

当 LLM 没有明确指定 run_in_background=true 时,这个函数作为兜底启发式规则,判断某个 bash 命令是否应该在后台执行。

def is_slow_operation(tool_name: str, tool_input: dict) -> bool: 函数签名。参数类型注解:strdict。返回类型 bool。这是 Python 3.5+ 的类型提示(type hint),不强制运行时检查。
if tool_name != "bash": 早期返回(early return)模式。只有 bash 工具才可能是慢操作,其他工具(read_file、write_file 等)直接判 False。
return False 提前退出函数,Python 不需要 else 分支。
cmd = tool_input.get("command", "").lower() dict.get(key, default):安全取值,key 不存在时返回 "" 而不是抛 KeyError。.lower():转小写以实现大小写不敏感匹配。
slow_keywords = ["install", "build", "test", ...] 关键词列表字面量(list literal)。包含常见耗时命令前缀:pip installnpm installcargo buildpytestmake 等。
return any(kw in cmd for kw in slow_keywords) any(iterable):有一个元素为 True 则整体为 True。kw in cmd:字符串子串检测(substring test),O(n) 操作。生成器表达式(generator expression)惰性求值,找到第一个匹配即短路。
示例is_slow_operation("bash", {"command": "pip install numpy"}) → True # "install" 在 slow_keywords 中 is_slow_operation("bash", {"command": "ls -la"}) → False # 无匹配关键词 is_slow_operation("read_file", {"path": "README.md"}) → False # tool_name != "bash",提前返回
5. should_run_background() — 决策函数

功能

整合两种信号:LLM 显式指定(优先级最高)和启发式规则(兜底)。这个函数是后台执行的"守门人"。

def should_run_background(tool_name: str, tool_input: dict) -> bool: 同 is_slow_operation 的签名风格。注意两个函数的参数名相同,方便在 agent_loop 中统一调用。
if tool_input.get("run_in_background"): dict.get() 返回 None 时,if None 为假;返回 True 时为真。LLM 可在 bash 工具调用中传入 "run_in_background": true,这是 S13 新增到 TOOLS schema 中的字段。
return True 模型明确要求 → 无条件后台执行,不走启发式。
return is_slow_operation(tool_name, tool_input) 兜底:调用启发式函数。函数调用作为 return 值直接传递,无需中间变量。
决策逻辑示意输入: bash(command="npm install", run_in_background=True) → tool_input.get("run_in_background") = True → return True ✓ 输入: bash(command="npm install") # LLM 未指定 → tool_input.get("run_in_background") = None (falsy) → is_slow_operation("bash", {...}) → True ✓ 输入: bash(command="echo hello") → tool_input.get("run_in_background") = None → is_slow_operation(...) → False → 同步执行
6. execute_tool() — 工具执行包装器

功能

S12 的 agent_loop 内部直接调用 TOOL_HANDLERS.get(block.name)。S13 将这个逻辑提取为独立函数,原因是它需要在后台线程(worker)中被调用,而后台线程无法访问 agent_loop 的局部作用域。

def execute_tool(block) -> str: 参数 block 无类型注解(duck typing):传入任何有 .name.input 属性的对象即可。通常是 Anthropic SDK 的 ToolUseBlock 对象。
handler = TOOL_HANDLERS.get(block.name) 从模块级 TOOL_HANDLERS 字典查找对应处理函数。dict.get(key) 不存在时返回 None。
if handler: None 在布尔上下文中为 falsy,可直接用 if 判断。
return handler(**block.input) ** 解包(dict unpacking):将 block.input 字典的键值对展开为关键字参数传给 handler。等价于 run_bash(command="ls")
return f"Unknown tool: {block.name}" f-string 格式化字符串(Python 3.6+)。始终返回字符串而非抛异常,保证后台线程不会因未知工具而崩溃。
7. start_background_task() — 后台线程派发

功能

这是 S13 最核心的新函数。它创建一个 daemon 线程来执行工具调用,并立即返回 bg_id(不等待执行完成)。主线程继续推进对话,后台线程独立运行。

def start_background_task(block) -> str: 返回 str(bg_id),而非工具执行结果。调用者立即得到 ID,无需等待。
global _bg_counter global 声明:告知 Python 此函数内对 _bg_counter 的赋值是修改模块级变量,而非创建局部变量。Python 读取全局变量不需要 global 声明,但写入需要。
_bg_counter += 1 递增计数器。注意:这里没有加锁,因为 agent_loop 是单线程调用此函数(只有主线程会派发新任务),所以无竞争。
bg_id = f"bg_{_bg_counter:04d}" f-string 格式规范 :04d:十进制整数,最少 4 位,不足补零。如 1 → bg_0001,99 → bg_0099
cmd = block.input.get("command", block.name) 日志用的可读命令描述。优先取 command 参数,若工具无 command 字段(如 create_task)则用工具名称代替。
def worker(): 闭包(closure):在 start_background_task 内部定义的嵌套函数。它自动捕获外层作用域的 block、bg_id、background_lock 等变量。
result = execute_tool(block) 在后台线程中执行工具,可能耗时数秒至数分钟。block 通过闭包引用,不是副本。
with background_lock: with 语句(上下文管理器):自动调用 background_lock.__enter__()(获锁)和 __exit__()(释锁),即使 block 内抛异常也会释锁。等价于 try/finally acquire/release。
background_tasks[bg_id]["status"] = "completed" 持锁状态下原子修改 dict 内嵌套值。"status" 从 "running" 改为 "completed",供 collect_background_results() 检测。
background_results[bg_id] = result 持锁写入结果。与上一行在同一个 with 块内,确保两个写操作要么都可见,要么都不可见(对读取该锁保护的线程而言)。
with background_lock: 主线程在启动线程之前先注册任务元数据(status="running"),防止后台线程极快完成时 collect 找不到元数据的竞态。
background_tasks[bg_id] = { 在锁保护下写入初始元数据 dict。
"tool_use_id": block.id, 保存 Anthropic SDK 的 tool_use_id,供 LLM 对话上下文关联(虽然 collect 最终用 bg_id 而非 tool_use_id 来生成通知)。
"command": cmd, 可读命令描述,用于日志和通知消息。
"status": "running", 初始状态,worker 完成后改为 "completed"。trailing comma(尾逗号)是 Python 的合法写法,便于后续添加字段。
thread = threading.Thread(target=worker, daemon=True) threading.Thread:创建线程对象(不立即启动)。target=worker:线程体函数。daemon=True:守护线程,当主进程退出时自动强制终止,不阻塞程序关闭。
thread.start() 启动线程,立即返回。此后 worker() 在独立线程中异步执行,当前函数继续向下执行而不等待。
print(f" \033[33m[background] dispatched {bg_id}: {cmd[:40]}\033[0m") \033[33m:ANSI 转义码,设置终端前景色为黄色。\033[0m:重置颜色。cmd[:40]:切片取前 40 字符,防止日志行过长。
return bg_id 立即返回 ID,调用者无需等待工具完成。这是"Fire and forget with tracking"模式的核心。
执行时序示例主线程调用 start_background_task(block): t=0ms _bg_counter=1, bg_id="bg_0001" t=0ms 注册 background_tasks["bg_0001"] = {status:"running"} t=0ms thread.start() # 线程开始,但主线程不等待 t=0ms return "bg_0001" ← 主线程继续推进对话 后台线程(并发运行): t=0ms worker() 开始 t=23s npm install 完成 → result = "added 142 packages" t=23s with lock: background_tasks["bg_0001"]["status"] = "completed" t=23s with lock: background_results["bg_0001"] = result # 等待下一轮 collect_background_results() 来收割
8. collect_background_results() — 结果收割与通知构造

功能

在每轮 agent_loop 处理完工具调用后,调用此函数检查哪些后台任务已完成,将结果格式化为 <task_notification> XML 片段,注入到下一轮对话的 user 消息中。

def collect_background_results() -> list[str]: 返回 list[str]:每个完成的后台任务对应一个通知字符串。空列表表示本轮无已完成任务。
with background_lock: 先加锁读 ready_ids,避免遍历 dict 时后台线程并发修改。
ready_ids = [bid for bid, task in background_tasks.items() 列表推导式(list comprehension)。background_tasks.items() 返回 (key, value) 二元组的视图。bid 是 bg_id,task 是 dict。
if task["status"] == "completed"] 过滤条件:只收集 status 为 "completed" 的任务 ID。
notifications = [] 在锁外构建通知列表(IO 操作不应持锁,避免锁竞争)。
for bg_id in ready_ids: 逐个处理已完成的任务。此时锁已释放,对每个任务重新加锁来删除。
with background_lock: 每次 pop 都重新加锁,保证线程安全。
task = background_tasks.pop(bg_id) dict.pop(key):删除并返回指定 key 的值。这是"消费"操作——同一结果不会被重复收割。
output = background_results.pop(bg_id, "") dict.pop(key, default):key 不存在时返回 "" 而非抛 KeyError(防御性编程,理论上此时结果一定存在)。
summary = output[:200] if len(output) > 200 else output 三元表达式(conditional expression):Python 写法为 x if condition else y。结果超过 200 字符则截断,避免通知消息过长占用 token。
notifications.append( list.append(item):O(1) 追加。传入多行 f-string。
f"<task_notification>\n" XML 格式通知。使用 XML 标签而非 JSON 是因为 Anthropic 文档建议 LLM 对 XML 格式的嵌入内容有更好的理解。\n 是换行符字面量。
f" <task_id>{bg_id}</task_id>\n" f-string 在花括号内插值。LLM 可从 task_id 追踪这是哪个后台任务的结果。
f" <status>completed</status>\n" 状态固定为 completed(只有 completed 的任务才被收割)。
f" <summary>{summary}</summary>\n" 截断后的输出摘要。放在 summary 标签内,使 LLM 知道这是部分输出。
f"</task_notification>") 关闭根标签。整个字符串通过 f-string 隐式拼接(Python 允许相邻字符串字面量自动拼接)。
return notifications 返回通知列表。调用者(agent_loop)将其附加到 user 消息的 content 列表中。
输出示例(单个通知)<task_notification> <task_id>bg_0001</task_id> <status>completed</status> <command>npm install</command> <summary>added 142 packages in 23.4s</summary> </task_notification> → 被注入到 messages 中作为 {"type": "text", "text": "..."}
9. run_bash() 签名修改

变化点

S12 的 run_bash(command: str) 只接受一个参数。S13 新增 run_in_background: bool = False 参数,但函数体不处理它——派发逻辑在 agent_loop 中。

def run_bash(command: str, run_in_background: bool = False) -> str: 新增参数有默认值 False,向后兼容(旧调用方无需改动)。参数接受但忽略,是因为 handler(**block.input) 会将 LLM 传来的 run_in_background 字段展开传入,若函数不接受此参数会报 TypeError。
# run_in_background is handled by agent_loop dispatch, not here 内联注释说明设计决策:后台派发是 agent_loop 层的横切关注点,不在工具函数层处理。工具函数只关心"执行",不关心"在哪个线程执行"。
10. agent_loop() 修改 — 分支派发 + 通知注入

S12 vs S13 的差异

S12 对每个工具调用直接执行并收集结果。S13 在执行前先判断是否应该后台执行,并在最后收集后台通知一并注入。

S13 新增:后台分支
if should_run_background(block.name, block.input): 调用决策函数。S12 没有这个 if 分支,所有工具都同步执行。
bg_id = start_background_task(block) 派发到后台线程,立即得到 bg_id(不等待完成)。
results.append({"type": "tool_result", 必须返回 tool_result 给 LLM(Anthropic API 要求每个 tool_use 都有对应的 tool_result)。
"tool_use_id": block.id, 与 LLM 请求中的 tool_use block id 对应,API 用此关联。
"content": f"[Background task {bg_id} started]..." 占位符消息。告知 LLM 任务已开始,结果稍后通知。LLM 据此可继续对话而不卡住等待。
else: 同步执行分支,逻辑与 S12 相同。
output = execute_tool(block) S13 改为调用 execute_tool(),而非直接查 TOOL_HANDLERS。统一封装。
S13 新增:通知注入
user_content = list(results) list(iterable):浅拷贝。results 是当前轮的 tool_result 列表,拷贝一份以便追加通知而不污染原列表。
bg_notifications = collect_background_results() 收割本轮已完成的后台任务。可能是空列表。
if bg_notifications: 非空列表为 truthy,只在有通知时才追加,避免发送空内容。
for notif in bg_notifications: 遍历通知列表。
user_content.append({"type": "text", "text": notif}) 通知以 {"type": "text"} 格式追加,而非 {"type": "tool_result"}。这是因为通知不是对某个特定工具调用的回应,而是新的信息输入。
messages.append({"role": "user", "content": user_content}) 将 tool_results + 可能的通知一并放入单条 user 消息发给 LLM。Anthropic API 要求 tool_result 必须在 user 角色消息中。
一轮 agent_loop 的消息流示例# LLM 请求两个工具:一个快,一个慢 response.content = [ TextBlock("好的,我来安装依赖并查看文件"), ToolUseBlock(id="tu_01", name="bash", input={"command": "npm install", "run_in_background": True}), ToolUseBlock(id="tu_02", name="read_file", input={"path": "README.md"}), ] # 处理结果: results = [ {"type":"tool_result", "tool_use_id":"tu_01", "content": "[Background task bg_0001 started] Command: npm install..."}, # 占位符 {"type":"tool_result", "tool_use_id":"tu_02", "content": "# Project\n..."}, # 同步结果 ] # 本轮恰好有一个更早派发的 bg_0000 完成了: bg_notifications = ["<task_notification><task_id>bg_0000</task_id>..."] # 最终发给 LLM 的 user 消息: user_content = [ {"type":"tool_result", "tool_use_id":"tu_01", ...}, # bg 占位符 {"type":"tool_result", "tool_use_id":"tu_02", ...}, # 同步结果 {"type":"text", "text": "<task_notification>..."}, # 已完成的后台通知 ]

11. TOOLS schema 变化

S13 对 bash 工具 schema 的修改

在 bash 工具的 input_schema 中新增 run_in_background 字段,使 LLM 可以在工具调用时显式声明需要后台执行。

{"name": "bash", "description": "Run a shell command.", 工具名和描述与 S12 相同。
"input_schema": {"type": "object", JSON Schema 格式,Anthropic API 用于参数验证和 LLM 提示生成。
"properties": { properties 定义参数列表。S12 只有 command。
"command": {"type": "string"}, 原有参数,不变。
"run_in_background": {"type": "boolean"}}, S13 新增。类型 boolean(true/false)。不在 required 列表中,因此是可选参数。LLM 可选择性提供此字段。
"required": ["command"]}}, run_in_background 不在 required 中,与 run_bash() 的默认值 False 呼应。