S12 的 agent loop 是同步的:每次工具调用都阻塞主线程等待结果。当用户要求执行 npm install、pytest 这类耗时几分钟的命令时,agent 和用户都被卡住,无法继续对话。
引入 threading.Thread 将慢操作派发到后台线程,主线程立即返回一个占位符给 LLM。后台线程完成后,结果通过 <task_notification> 格式注入到下一轮对话中。
S13 是 agent 系统从"同步工具执行"升级到"异步任务调度"的关键一步,为后续 S14 的 cron scheduler(独立调度线程)以及 S15 的 teammate 线程奠定基础。
S12 的 import 行是 import os, subprocess, json, time, random,S13 新增了 threading 模块。
| import os, subprocess, json, time, random, threading | threading:Python 标准库线程模块。提供 Thread、Lock 等并发原语。S13 所有后台执行逻辑都依赖此模块。 |
用三个模块级变量追踪所有后台任务的生命周期:计数器产生唯一 ID、两个 dict 分别存元数据和结果、Lock 保证多线程安全。
| _bg_counter = 0 | 模块级整数计数器。每次派发新后台任务时递增,生成唯一 ID(如 bg_0001)。前缀 _ 表示模块私有。 |
| background_tasks: dict[str, dict] = {} | 类型注解:dict[str, dict] 是 Python 3.9+ 泛型写法。键为 bg_id,值为包含 tool_use_id、command、status 的 dict。存储任务元数据。 |
| background_results: dict[str, str] = {} | 键为 bg_id,值为工具执行的字符串输出。与 background_tasks 分开存放,是为了在 collect 时可以一起原子删除(先 pop 元数据再 pop 结果)。 |
| background_lock = threading.Lock() | threading.Lock():互斥锁。同一时刻只有一个线程可以持有它。所有对 background_tasks / background_results 的读写都必须在 with background_lock: 块内进行,防止竞态条件(race condition)。 |
当 LLM 没有明确指定 run_in_background=true 时,这个函数作为兜底启发式规则,判断某个 bash 命令是否应该在后台执行。
| def is_slow_operation(tool_name: str, tool_input: dict) -> bool: | 函数签名。参数类型注解:str 和 dict。返回类型 bool。这是 Python 3.5+ 的类型提示(type hint),不强制运行时检查。 |
| if tool_name != "bash": | 早期返回(early return)模式。只有 bash 工具才可能是慢操作,其他工具(read_file、write_file 等)直接判 False。 |
| return False | 提前退出函数,Python 不需要 else 分支。 |
| cmd = tool_input.get("command", "").lower() | dict.get(key, default):安全取值,key 不存在时返回 "" 而不是抛 KeyError。.lower():转小写以实现大小写不敏感匹配。 |
| slow_keywords = ["install", "build", "test", ...] | 关键词列表字面量(list literal)。包含常见耗时命令前缀:pip install、npm install、cargo build、pytest、make 等。 |
| return any(kw in cmd for kw in slow_keywords) | any(iterable):有一个元素为 True 则整体为 True。kw in cmd:字符串子串检测(substring test),O(n) 操作。生成器表达式(generator expression)惰性求值,找到第一个匹配即短路。 |
整合两种信号:LLM 显式指定(优先级最高)和启发式规则(兜底)。这个函数是后台执行的"守门人"。
| def should_run_background(tool_name: str, tool_input: dict) -> bool: | 同 is_slow_operation 的签名风格。注意两个函数的参数名相同,方便在 agent_loop 中统一调用。 |
| if tool_input.get("run_in_background"): | dict.get() 返回 None 时,if None 为假;返回 True 时为真。LLM 可在 bash 工具调用中传入 "run_in_background": true,这是 S13 新增到 TOOLS schema 中的字段。 |
| return True | 模型明确要求 → 无条件后台执行,不走启发式。 |
| return is_slow_operation(tool_name, tool_input) | 兜底:调用启发式函数。函数调用作为 return 值直接传递,无需中间变量。 |
S12 的 agent_loop 内部直接调用 TOOL_HANDLERS.get(block.name)。S13 将这个逻辑提取为独立函数,原因是它需要在后台线程(worker)中被调用,而后台线程无法访问 agent_loop 的局部作用域。
| def execute_tool(block) -> str: | 参数 block 无类型注解(duck typing):传入任何有 .name 和 .input 属性的对象即可。通常是 Anthropic SDK 的 ToolUseBlock 对象。 |
| handler = TOOL_HANDLERS.get(block.name) | 从模块级 TOOL_HANDLERS 字典查找对应处理函数。dict.get(key) 不存在时返回 None。 |
| if handler: | None 在布尔上下文中为 falsy,可直接用 if 判断。 |
| return handler(**block.input) | ** 解包(dict unpacking):将 block.input 字典的键值对展开为关键字参数传给 handler。等价于 run_bash(command="ls")。 |
| return f"Unknown tool: {block.name}" | f-string 格式化字符串(Python 3.6+)。始终返回字符串而非抛异常,保证后台线程不会因未知工具而崩溃。 |
这是 S13 最核心的新函数。它创建一个 daemon 线程来执行工具调用,并立即返回 bg_id(不等待执行完成)。主线程继续推进对话,后台线程独立运行。
| def start_background_task(block) -> str: | 返回 str(bg_id),而非工具执行结果。调用者立即得到 ID,无需等待。 |
| global _bg_counter | global 声明:告知 Python 此函数内对 _bg_counter 的赋值是修改模块级变量,而非创建局部变量。Python 读取全局变量不需要 global 声明,但写入需要。 |
| _bg_counter += 1 | 递增计数器。注意:这里没有加锁,因为 agent_loop 是单线程调用此函数(只有主线程会派发新任务),所以无竞争。 |
| bg_id = f"bg_{_bg_counter:04d}" | f-string 格式规范 :04d:十进制整数,最少 4 位,不足补零。如 1 → bg_0001,99 → bg_0099。 |
| cmd = block.input.get("command", block.name) | 日志用的可读命令描述。优先取 command 参数,若工具无 command 字段(如 create_task)则用工具名称代替。 |
| def worker(): | 闭包(closure):在 start_background_task 内部定义的嵌套函数。它自动捕获外层作用域的 block、bg_id、background_lock 等变量。 |
| result = execute_tool(block) | 在后台线程中执行工具,可能耗时数秒至数分钟。block 通过闭包引用,不是副本。 |
| with background_lock: | with 语句(上下文管理器):自动调用 background_lock.__enter__()(获锁)和 __exit__()(释锁),即使 block 内抛异常也会释锁。等价于 try/finally acquire/release。 |
| background_tasks[bg_id]["status"] = "completed" | 持锁状态下原子修改 dict 内嵌套值。"status" 从 "running" 改为 "completed",供 collect_background_results() 检测。 |
| background_results[bg_id] = result | 持锁写入结果。与上一行在同一个 with 块内,确保两个写操作要么都可见,要么都不可见(对读取该锁保护的线程而言)。 |
| with background_lock: | 主线程在启动线程之前先注册任务元数据(status="running"),防止后台线程极快完成时 collect 找不到元数据的竞态。 |
| background_tasks[bg_id] = { | 在锁保护下写入初始元数据 dict。 |
| "tool_use_id": block.id, | 保存 Anthropic SDK 的 tool_use_id,供 LLM 对话上下文关联(虽然 collect 最终用 bg_id 而非 tool_use_id 来生成通知)。 |
| "command": cmd, | 可读命令描述,用于日志和通知消息。 |
| "status": "running", | 初始状态,worker 完成后改为 "completed"。trailing comma(尾逗号)是 Python 的合法写法,便于后续添加字段。 |
| thread = threading.Thread(target=worker, daemon=True) | threading.Thread:创建线程对象(不立即启动)。target=worker:线程体函数。daemon=True:守护线程,当主进程退出时自动强制终止,不阻塞程序关闭。 |
| thread.start() | 启动线程,立即返回。此后 worker() 在独立线程中异步执行,当前函数继续向下执行而不等待。 |
| print(f" \033[33m[background] dispatched {bg_id}: {cmd[:40]}\033[0m") | \033[33m:ANSI 转义码,设置终端前景色为黄色。\033[0m:重置颜色。cmd[:40]:切片取前 40 字符,防止日志行过长。 |
| return bg_id | 立即返回 ID,调用者无需等待工具完成。这是"Fire and forget with tracking"模式的核心。 |
在每轮 agent_loop 处理完工具调用后,调用此函数检查哪些后台任务已完成,将结果格式化为 <task_notification> XML 片段,注入到下一轮对话的 user 消息中。
| def collect_background_results() -> list[str]: | 返回 list[str]:每个完成的后台任务对应一个通知字符串。空列表表示本轮无已完成任务。 |
| with background_lock: | 先加锁读 ready_ids,避免遍历 dict 时后台线程并发修改。 |
| ready_ids = [bid for bid, task in background_tasks.items() | 列表推导式(list comprehension)。background_tasks.items() 返回 (key, value) 二元组的视图。bid 是 bg_id,task 是 dict。 |
| if task["status"] == "completed"] | 过滤条件:只收集 status 为 "completed" 的任务 ID。 |
| notifications = [] | 在锁外构建通知列表(IO 操作不应持锁,避免锁竞争)。 |
| for bg_id in ready_ids: | 逐个处理已完成的任务。此时锁已释放,对每个任务重新加锁来删除。 |
| with background_lock: | 每次 pop 都重新加锁,保证线程安全。 |
| task = background_tasks.pop(bg_id) | dict.pop(key):删除并返回指定 key 的值。这是"消费"操作——同一结果不会被重复收割。 |
| output = background_results.pop(bg_id, "") | dict.pop(key, default):key 不存在时返回 "" 而非抛 KeyError(防御性编程,理论上此时结果一定存在)。 |
| summary = output[:200] if len(output) > 200 else output | 三元表达式(conditional expression):Python 写法为 x if condition else y。结果超过 200 字符则截断,避免通知消息过长占用 token。 |
| notifications.append( | list.append(item):O(1) 追加。传入多行 f-string。 |
| f"<task_notification>\n" | XML 格式通知。使用 XML 标签而非 JSON 是因为 Anthropic 文档建议 LLM 对 XML 格式的嵌入内容有更好的理解。\n 是换行符字面量。 |
| f" <task_id>{bg_id}</task_id>\n" | f-string 在花括号内插值。LLM 可从 task_id 追踪这是哪个后台任务的结果。 |
| f" <status>completed</status>\n" | 状态固定为 completed(只有 completed 的任务才被收割)。 |
| f" <summary>{summary}</summary>\n" | 截断后的输出摘要。放在 summary 标签内,使 LLM 知道这是部分输出。 |
| f"</task_notification>") | 关闭根标签。整个字符串通过 f-string 隐式拼接(Python 允许相邻字符串字面量自动拼接)。 |
| return notifications | 返回通知列表。调用者(agent_loop)将其附加到 user 消息的 content 列表中。 |
S12 的 run_bash(command: str) 只接受一个参数。S13 新增 run_in_background: bool = False 参数,但函数体不处理它——派发逻辑在 agent_loop 中。
| def run_bash(command: str, run_in_background: bool = False) -> str: | 新增参数有默认值 False,向后兼容(旧调用方无需改动)。参数接受但忽略,是因为 handler(**block.input) 会将 LLM 传来的 run_in_background 字段展开传入,若函数不接受此参数会报 TypeError。 |
| # run_in_background is handled by agent_loop dispatch, not here | 内联注释说明设计决策:后台派发是 agent_loop 层的横切关注点,不在工具函数层处理。工具函数只关心"执行",不关心"在哪个线程执行"。 |
S12 对每个工具调用直接执行并收集结果。S13 在执行前先判断是否应该后台执行,并在最后收集后台通知一并注入。
| if should_run_background(block.name, block.input): | 调用决策函数。S12 没有这个 if 分支,所有工具都同步执行。 |
| bg_id = start_background_task(block) | 派发到后台线程,立即得到 bg_id(不等待完成)。 |
| results.append({"type": "tool_result", | 必须返回 tool_result 给 LLM(Anthropic API 要求每个 tool_use 都有对应的 tool_result)。 |
| "tool_use_id": block.id, | 与 LLM 请求中的 tool_use block id 对应,API 用此关联。 |
| "content": f"[Background task {bg_id} started]..." | 占位符消息。告知 LLM 任务已开始,结果稍后通知。LLM 据此可继续对话而不卡住等待。 |
| else: | 同步执行分支,逻辑与 S12 相同。 |
| output = execute_tool(block) | S13 改为调用 execute_tool(),而非直接查 TOOL_HANDLERS。统一封装。 |
| user_content = list(results) | list(iterable):浅拷贝。results 是当前轮的 tool_result 列表,拷贝一份以便追加通知而不污染原列表。 |
| bg_notifications = collect_background_results() | 收割本轮已完成的后台任务。可能是空列表。 |
| if bg_notifications: | 非空列表为 truthy,只在有通知时才追加,避免发送空内容。 |
| for notif in bg_notifications: | 遍历通知列表。 |
| user_content.append({"type": "text", "text": notif}) | 通知以 {"type": "text"} 格式追加,而非 {"type": "tool_result"}。这是因为通知不是对某个特定工具调用的回应,而是新的信息输入。 |
| messages.append({"role": "user", "content": user_content}) | 将 tool_results + 可能的通知一并放入单条 user 消息发给 LLM。Anthropic API 要求 tool_result 必须在 user 角色消息中。 |
在 bash 工具的 input_schema 中新增 run_in_background 字段,使 LLM 可以在工具调用时显式声明需要后台执行。
| {"name": "bash", "description": "Run a shell command.", | 工具名和描述与 S12 相同。 |
| "input_schema": {"type": "object", | JSON Schema 格式,Anthropic API 用于参数验证和 LLM 提示生成。 |
| "properties": { | properties 定义参数列表。S12 只有 command。 |
| "command": {"type": "string"}, | 原有参数,不变。 |
| "run_in_background": {"type": "boolean"}}, | S13 新增。类型 boolean(true/false)。不在 required 列表中,因此是可选参数。LLM 可选择性提供此字段。 |
| "required": ["command"]}}, | run_in_background 不在 required 中,与 run_bash() 的默认值 False 呼应。 |