1. 整体功能 — 多 Agent 协作架构

问题背景

S14 的系统只有一个 agent(Lead)。复杂任务需要并行处理,或需要专门技能的 agent。S15 引入"团队"概念:Lead 可以派生 Teammate,Teammate 独立执行子任务后将结果发回 Lead。

S15 的解决方案

三个核心机制:MessageBus(文件基础消息总线,.jsonl mailbox)、spawn_teammate_thread(在后台线程启动独立 agent loop)、inbox 注入(Teammate 结果自动注入 Lead 对话历史)。

在系统中的角色

S15 是整个课程的最高层次:将之前所有组件(Task System、Background Tasks、Cron Scheduler)整合为一个可以自我扩展的多 Agent 系统。Lead 通过工具 spawn/send/check 与 Teammate 协作。

ASCII 协作流程(来自源码注释)
Lead: cron_queue → messages → prompt → LLM → TOOLS ────→ loop ↑ ↓ | └── inbox ← MessageBus ← teammate.send_message ←┘ Teammate: inbox → LLM → bash/read/write/send → loop (max 10 turns) 说明: Lead = 主 agent,运行在主线程(由 agent_lock 保护) Teammate = 子 agent,每个运行在独立 daemon 线程 MessageBus = 通过 .mailboxes/*.jsonl 文件传递消息 inbox = Lead 的邮箱,Teammate 写入,main loop 读取后注入 history max 10 turns = 教学版限制,真实 CC 用 idle loop + shutdown_request
2. MessageBus 类定义与 MAILBOX_DIR

设计模式

MessageBus 使用文件基础消息系统:每个 agent 有一个 .mailboxes/{agent}.jsonl 文件作为收件箱。JSONL(JSON Lines)格式:每行一个 JSON 对象,便于 append 写入。读取时破坏性消费(read + unlink)。

MAILBOX_DIR = WORKDIR / ".mailboxes" 所有 mailbox 文件的根目录。pathlib / 运算符拼接路径。以 . 开头是约定隐藏文件夹(ls 不显示)。
MAILBOX_DIR.mkdir(exist_ok=True) 模块加载时即创建目录。exist_ok=True:目录已存在时不抛 FileExistsError(幂等操作)。
class MessageBus: 普通类定义(非 dataclass)。不继承任何父类,隐式继承 object。有两个方法:send 和 read_inbox。
"""File-based message bus. Each agent has a .jsonl inbox. 类 docstring(Google-style):描述整体设计。
Read is destructive: read_text + unlink (consumes messages). 强调消费语义:读取后文件被删除,消息不可重复消费。这是简单实现的权衡——真实 CC 使用 proper-lockfile 防止并发写入冲突。
Teaching version: no file locking; real CC uses proper-lockfile.""" 教学版说明:无文件锁(potential race condition),但在单机单线程写入的场景下足够安全。
3. MessageBus.send() — 发送消息到收件箱
def send(self, from_agent: str, to_agent: str, content: str, 实例方法,第一个参数 self 是惯例。from_agent、to_agent 是 agent 名称字符串(如 "lead"、"alice")。
msg_type: str = "message"): msg_type 有默认值 "message",最终 result 类型消息传 "result"(在 spawn_teammate_thread 末尾)。
msg = {"from": from_agent, "to": to_agent, 构造消息 dict。键名 "from" 是保留字(Python 关键字),但作为 dict 键字符串完全合法(dict 键是字符串,不是标识符)。
"content": content, "type": msg_type, type 字段帮助 lead 区分普通消息和最终 result(可用于不同处理逻辑,当前 check_inbox 不区分)。
"ts": time.time()} time.time():Unix 时间戳(float,秒)。用于调试和排序(当前代码未排序,仅记录)。
inbox = MAILBOX_DIR / f"{to_agent}.jsonl" 构造收件人的 mailbox 文件路径。f-string 插入 to_agent 名称。.jsonl 后缀表示 JSON Lines 格式。
with open(inbox, "a") as f: open() 模式 "a":追加写入(append)。文件不存在时自动创建;存在时在文件末尾写入,不覆盖原有内容。这使多条消息可以累积到同一文件中。
f.write(json.dumps(msg) + "\n") json.dumps(msg):不带 indent 参数,生成紧凑单行 JSON(JSONL 格式要求每行一个完整 JSON 对象)。+ "\n":追加换行符分隔各消息。
print(f" \033[33m[bus] {from_agent} → {to_agent}: " 黄色日志输出,Unicode → 箭头字符(非 ASCII,Python 3 字符串默认 UTF-8)。
f"{content[:50]}\033[0m") 内容只显示前 50 字符,防止长消息污染终端。
mailbox 文件内容示例 (.mailboxes/lead.jsonl){"from": "alice", "to": "lead", "content": "Tests passed: 42/42", "type": "message", "ts": 1750000000.123} {"from": "bob", "to": "lead", "content": "Deployment complete", "type": "result", "ts": 1750000060.456} # 每行是一个独立的 JSON 对象(JSONL 格式)
4. MessageBus.read_inbox() — 读取并消费收件箱
def read_inbox(self, agent: str) -> list[dict]: 返回 list[dict]:每个消息是一个 dict。空收件箱返回 []。
inbox = MAILBOX_DIR / f"{agent}.jsonl" 构造目标 agent 的 mailbox 路径。
if not inbox.exists(): 文件不存在意味着收件箱为空(或从未收到消息)。
return [] 提前返回空列表,避免后续 FileNotFoundError。
msgs = [json.loads(line) for line in inbox.read_text().splitlines() 列表推导 + JSONL 解析read_text() 读取全文,splitlines() 按换行符分割,json.loads(line) 将每行 JSON 字符串解析为 dict。
if line.strip()] 过滤条件:跳过空行(line.strip() 对空行/仅空白行返回 "",为 falsy)。文件末尾可能有多余的 \n 产生空行。
inbox.unlink() # consume: read + delete Path.unlink():删除文件(等价于 os.remove)。注释明确此操作是"消费"语义。删除后收件箱清空,下次 read_inbox 返回 [](除非有新消息写入)。
return msgs 返回解析后的消息列表。
read_inbox 操作示意调用前:.mailboxes/lead.jsonl 存在,内含 2 条消息 read_inbox("lead"): 1. inbox.exists() → True 2. read_text() → 两行 JSON 字符串 3. [json.loads(line) for ...] → [msg1_dict, msg2_dict] 4. inbox.unlink() → 文件删除 调用后:.mailboxes/lead.jsonl 不存在 下次 read_inbox("lead") → [] 注意:send() 之后再次 read_inbox() 才能读到新消息(文件被删后 send 重新创建)
5. 全局状态 — BUS 实例与 active_teammates
BUS = MessageBus() 模块级单例(singleton)。所有代码共用同一个 MessageBus 实例。Python 中实现单例最简单的方式就是模块级变量(模块只加载一次)。
active_teammates: dict[str, bool] = {} 追踪当前活跃 Teammate 的 dict。键为 teammate 名称,值为 True(仅作存在性标记)。用于防止重复 spawn 同名 Teammate。Teammate 完成后通过 active_teammates.pop(name, None) 移除自己。
6. spawn_teammate_thread() — 派生 Teammate Agent

功能

这是 S15 最核心的新函数。它在后台线程中启动一个独立的迷你 agent loop,配备自己的 system prompt、工具集和对话历史。Teammate 可以使用 bash/read/write/send_message 四个工具,通过 MessageBus 将结果发回 Lead。

def spawn_teammate_thread(name: str, role: str, prompt: str) -> str: 三个参数:name(唯一标识,也是 mailbox 文件名前缀)、role(注入 system prompt 的角色描述)、prompt(第一条 user 消息,告知 Teammate 要做什么)。返回 str 状态消息。
if name in active_teammates: 幂等检查:同名 Teammate 已存在则拒绝重复 spawn,防止重复劳动和资源浪费。
return f"Teammate '{name}' already exists" 错误消息直接返回给 LLM,LLM 可据此调整策略(如等待原 Teammate 完成或换名称)。
system = (f"You are '{name}', a {role}. " Teammate 的 system prompt。括号内多行 f-string(Python 隐式字符串连接,括号内的相邻字符串自动拼接)。
f"Use tools to complete tasks. " 简单指令,鼓励 Teammate 主动使用工具(而非只回复文字)。
f"Send results via send_message to 'lead'.") 关键约束:Teammate 必须将结果发回 lead,否则 Lead 无法得知任务完成。
7. Teammate 内部 agent loop(run() 闭包)

设计

run() 是定义在 spawn_teammate_thread 内的闭包(closure),通过闭包捕获 name、system、prompt 等外层变量。它实现了一个精简版 agent loop,专为 Teammate 设计(有最大轮数限制、工具集更小)。

def run(): 无参数闭包。捕获外层的 name、system、prompt、sub_tools、sub_handlers。这是 Python 闭包的典型用法——内部函数自动引用外层作用域变量。
messages = [{"role": "user", "content": prompt}] Teammate 独立的对话历史,从第一条 user 消息(prompt)开始。与 Lead 的 session_history 完全隔离。
sub_tools = [ Teammate 的工具集,比 Lead 小得多:bash(执行命令)、read_file(读文件)、write_file(写文件)、send_message(与 Lead 通信)。没有 task system 和 cron 工具——Teammate 只专注执行,不负责调度。
{"name": "bash", ...}, bash 工具:Teammate 的 bash 无 run_in_background 字段(教学版 Teammate 不支持嵌套后台任务,简化设计)。
{"name": "send_message", send_message 工具:专用于 Teammate → Lead 通信。schema 中只有 to 和 content,不暴露 from 和 type 参数(由实现层固定)。
sub_handlers = { Teammate 的工具处理器字典,对应 sub_tools 的四个工具。
"bash": run_bash, "read_file": run_read, "write_file": run_write, 直接复用 Lead 的工具函数(闭包引用外层模块级函数)。
"send_message": lambda to, content: (BUS.send(name, to, content), lambda 表达式:匿名函数,捕获外层 name(Teammate 名称)。将 from_agent 固定为 name,只暴露 to 和 content 参数。
"Sent")[1], 元组索引 [1](BUS.send(...), "Sent") 是元组字面量。BUS.send() 返回 None(无返回值),[1] 取 "Sent" 字符串作为工具调用结果返回给 LLM。括号内逗号表达式产生元组(Python 中逗号是元组的核心,括号只是分组)。
for _ in range(10): 有界循环:最多执行 10 轮(10 次 LLM 调用),防止 Teammate 无限运行消耗资源。_:按约定的"不使用的变量"名称,表示循环变量本身不被使用。
inbox = BUS.read_inbox(name) 每轮开始前检查自己的收件箱(Lead 可能发来新指令)。教学版:先读 inbox,如有消息则注入对话,然后继续 LLM 调用。
if inbox: 空列表为 falsy,有消息才注入。
messages.append({"role": "user", 以 user 角色注入 inbox 消息,使 LLM 知晓 Lead 发来的新信息。
"content": f"<inbox>{json.dumps(inbox)}</inbox>"}) 用 XML <inbox> 标签包裹 JSON 序列化的消息列表。混合格式(XML 外层 + JSON 内容)便于 LLM 识别边界。json.dumps(inbox) 将 list[dict] 序列化为 JSON 字符串。
try: 捕获 LLM API 错误,任何异常都 break 退出循环(Teammate 会在 finally 阶段发送最终摘要)。
response = client.messages.create( 使用全局 client 调用 Anthropic API。Teammate 与 Lead 共用同一个 Anthropic 客户端实例(线程安全,HTTP 连接池支持并发)。
model=MODEL, system=system, messages=messages[-20:], messages[-20:]:切片取最近 20 条消息(Python 负索引从末尾计数)。Lead 用完整 messages,Teammate 用滑动窗口——Teammate 任务专一,历史无需太长,也节省 token。
except Exception: 捕获所有异常(含网络错误、rate limit 等)但不打印错误(避免后台线程日志混乱),直接 break 退出 for 循环。
break 退出 for 循环,执行循环外的"发送最终摘要"逻辑。
messages.append({"role": "assistant", "content": response.content}) 将 LLM 回复追加到 Teammate 的对话历史。
if response.stop_reason != "tool_use": LLM 停止工具调用(发出文本回复)→ 任务完成或 LLM 认为无需继续,break 退出 for 循环。
break 自然结束,后续发送最终摘要。
results = [] 本轮工具调用结果列表。
for block in response.content: 遍历 LLM 回复内容,找出 tool_use block。
if block.type == "tool_use": 过滤:只处理工具调用 block(可能有 text block 混在其中)。
handler = sub_handlers.get(block.name) 从 Teammate 专属的 sub_handlers 查找处理器(不用 Lead 的 TOOL_HANDLERS)。
output = handler(**block.input) if handler else "Unknown" 三元表达式:找到 handler 则调用(** 解包参数),否则返回 "Unknown"(Teammate 工具集小,遇到未知工具不应崩溃)。
results.append({"type": "tool_result", 构造 tool_result,格式与 Lead 相同(Anthropic API 要求)。
"tool_use_id": block.id, 与 LLM 请求的 block.id 对应,API 用于关联请求与结果。
"content": str(output)}) str(output):强制转为字符串(output 可能是 None 或其他类型),Anthropic API 的 content 字段要求 str。
messages.append({"role": "user", "content": results}) 将工具结果以 user 消息形式追加(Anthropic 协议要求)。
Teammate 一轮工作示例# 第 1 轮(初始 prompt) messages = [{"role":"user", "content": "写一个 hello.py 并测试它"}] LLM 回复: TextBlock("我来写 hello.py...") ToolUseBlock(id="tu_01", name="write_file", input={"path":"hello.py","content":"print('hello')"}) ToolUseBlock(id="tu_02", name="bash", input={"command":"python hello.py"}) 处理工具: write_file → "Wrote 15 bytes to hello.py" bash → "hello" messages.append(assistant 回复) messages.append(user: [tool_result(tu_01), tool_result(tu_02)]) # 第 2 轮(LLM 看到工具结果) LLM 回复: TextBlock("测试成功,发送结果给 lead") ToolUseBlock(name="send_message", input={"to":"lead","content":"hello.py 已写入并测试成功,输出:hello"}) 处理工具: send_message → BUS.send("alice", "lead", "hello.py 已写入...") → .mailboxes/lead.jsonl 写入消息 stop_reason == "end_turn" (无更多工具调用)→ break
8. Teammate 最终摘要发送与清理

功能

无论 Teammate 正常完成(10 轮用尽、LLM stop_reason 非 tool_use、API 错误),都在退出 for 循环后提取最后一条 assistant 消息发回 Lead,并从 active_teammates 中移除自身。

# Send final summary to Lead 此代码在 for 循环外,break 后(或正常循环结束后)执行。
summary = "Done." 默认摘要,防止遍历失败时发送空内容。
for msg in reversed(messages): reversed():返回反向迭代器(不拷贝列表)。从最新消息向前找,获取最后一条 assistant 消息。
if msg["role"] == "assistant" and isinstance(msg["content"], list): 找 assistant 角色消息,且 content 必须是 list(包含 Block 对象)而非 str(错误消息格式)。
for b in msg["content"]: 遍历 assistant 消息的 content block 列表。
if getattr(b, "type", None) == "text": getattr(obj, attr, default):安全属性访问。b 是 Anthropic SDK 的 Block 对象,有 .type 属性;不确定时用 getattr 防 AttributeError。
summary = b.text 找到 text block,提取文本内容作为摘要。
break 内层 break:退出 for b 循环(找到第一个 text block 即可)。
else: for...else 语法:Python 特有。else 块在 for 循环正常完成(未被 break 中断)时执行。此处:若 for b 没有 break(即 content 中无 text block),则执行 else。
continue 内层 for 正常完成(无 text block)→ continue 继续外层 for msg 循环,找上一条 assistant 消息。
break 内层 for 被 break(找到 text block,summary 已更新)→ 外层 for msg 也 break,不再继续搜索。
BUS.send(name, "lead", summary, "result") 发送最终摘要到 Lead 的收件箱,msg_type="result"(区别于中间的 "message" 类型)。即使 API 出错,Teammate 也会发送默认 "Done." 通知 Lead。
active_teammates.pop(name, None) dict.pop(key, default):线程安全地从 active_teammates 移除自身(pop 带 default 不会在 key 不存在时报错)。清理后 Lead 可以再次 spawn 同名 Teammate。
print(f" \033[32m[teammate] {name} finished\033[0m") \033[32m:ANSI 绿色,表示成功完成。
for...else 语义图解(Python 特有语法)for b in msg["content"]: if getattr(b, "type", None) == "text": summary = b.text break ← 找到 text block,break 内层循环 else: ← 只在 for 正常结束(无 break)时执行 continue ← 无 text block → 继续外层 for msg 循环 break ← 内层 break 后跳到这里 → 也 break 外层循环 效果: - 找到 text block → 更新 summary → 两层 for 都 break → 发送摘要 - 未找到 text block → continue 外层循环 → 搜索上一条 assistant 消息
9. spawn_teammate_thread() 线程启动
active_teammates[name] = True 在启动线程之前先注册,防止同名 Teammate 在 run() 完成之前被再次 spawn(线程启动有延迟)。值为 True,仅作存在性标记(实际不检查值)。
threading.Thread(target=run, daemon=True).start() 创建并立即启动 daemon 线程。S14 的 start_background_task 把 Thread 赋值给 thread 变量再 start(),S15 直接链式调用更简洁(因为 S15 不需要 thread.join() 等操作)。
print(f" \033[36m[teammate] {name} spawned as {role}\033[0m") \033[36m:ANSI 青色。日志区别于其他颜色,便于快速识别 Teammate 生命周期事件。
return f"Teammate '{name}' spawned as {role}" 返回给 LLM 的确认消息,包含 name 和 role,供 LLM 在后续对话中引用。
10. Team 工具函数 — run_spawn_teammate / run_send_message / run_check_inbox
run_spawn_teammate()
def run_spawn_teammate(name, role, prompt) -> str: 薄包装,直接委托给 spawn_teammate_thread()。参数与 TOOLS schema 的 spawn_teammate 工具对应。
return spawn_teammate_thread(name, role, prompt) 单行函数体,无额外处理。返回状态消息字符串。
run_send_message()
def run_send_message(to: str, content: str) -> str: Lead 发送消息到 Teammate 的工具函数。from_agent 固定为 "lead"(Lead 身份)。
BUS.send("lead", to, content) 调用 MessageBus.send(),写入 to 的 mailbox 文件。
return f"Sent to {to}" 返回确认消息。BUS.send() 返回 None,需要单独返回字符串。
run_check_inbox()
def run_check_inbox() -> str: 无参数:始终检查 "lead" 的收件箱(Lead 的工具,只读自己的 mailbox)。
msgs = BUS.read_inbox("lead") 破坏性读取:消费所有当前消息。
if not msgs: 空列表为 falsy。
return "(inbox empty)" 明确提示 LLM 收件箱为空,便于 LLM 决定是否再次检查。
lines = [] 构建可读输出。
for m in msgs: 遍历消息 dict 列表。
lines.append(f" [{m['from']}] {m['content'][:200]}") m['from']:dict 字符串键(不能用 m.from,因为 from 是 Python 关键字)。content[:200] 限制输出长度。
return "\n".join(lines) str.join(iterable):用换行符连接消息列表为单个字符串,格式化返回给 LLM。
check_inbox 输出示例(当有 2 条消息时) [alice] Tests all passed: 42/42 unit tests, 0 failures [bob] Deployment complete: service running on port 8080
11. execute_tool() 更新 — 新增团队工具

变化

S15 的 execute_tool() 在 S14 的基础上新增三个团队工具的 handler 映射。内联 dict 写法(而非引用 TOOL_HANDLERS 全局变量),是 S14 引入的模式。

"spawn_teammate": run_spawn_teammate, S15 新增。Lead 调用此工具时派生 Teammate。
"send_message": run_send_message, "check_inbox": run_check_inbox, S15 新增。Lead 通过这两个工具与 Teammate 双向通信。
12. TOOLS schema 新增 — 三个团队工具
spawn_teammate
{"name": "spawn_teammate", 工具名,与 execute_tool 中的键对应。
"description": "Spawn a teammate agent in a background thread.", 描述文本帮助 LLM 决定何时调用此工具。
"input_schema": {"type": "object", JSON Schema object 类型。
"properties": { 三个必填参数:name、role、prompt。
"name": {"type": "string"}, Teammate 唯一名称(用作 mailbox 文件名)。
"role": {"type": "string"}, 角色描述,注入 Teammate system prompt。
"prompt": {"type": "string"}}, 初始任务指令。
"required": ["name", "role", "prompt"]}}, 三个参数全部必填,无默认值。
send_message
{"name": "send_message", Lead 发送消息给 Teammate 的工具。
"description": "Send a message to a teammate via MessageBus.", 描述提及 MessageBus,帮助 LLM 理解消息传递机制。
"to": {"type": "string"}, 接收者名称(Teammate 的 name 参数)。
"content": {"type": "string"}}, 消息内容文本。
"required": ["to", "content"]}}, 两个参数都必填。
check_inbox
{"name": "check_inbox", Lead 检查自己收件箱的工具。
"description": "Check Lead's inbox for teammate messages.", "Lead's inbox" 明确指出是 Lead 检查自己的 mailbox。
"input_schema": {"type": "object", "properties": {}, 空 properties:无参数工具。properties: {} + required: [] 是 Anthropic API 对无参工具的标准 schema 写法。
"required": []}}, 空 required 列表:无必填参数(当然也无任何参数)。
13. main loop 收件箱注入 — 自动注入 Teammate 结果

S14 vs S15 的 main loop 差异

S14 的 main loop 在 agent_loop 返回后只打印文本。S15 额外检查 Lead 收件箱,将 Teammate 发来的消息注入到 session_history,供下一轮 agent_loop 处理。S15 主动去掉了 S14 的 session_history / session_context 全局变量和 queue_processor_loop,回归更简单的 main loop 结构。

agent_loop(history, context) S15 的 agent_loop 无返回值(恢复为 S13 的 None 返回),不像 S14 返回 context。
context = update_context(context, history) agent_loop 结束后更新 context(与 S13 相同模式)。
for block in history[-1]["content"]: 打印最后一条 assistant 消息(S13 模式)。
# Check inbox for teammate results → inject into history S15 新增注释:主动检查收件箱,将结果注入历史。
inbox = BUS.read_inbox("lead") 每次用户交互结束后(agent_loop 完成后)立即检查收件箱。破坏性读取——消费所有当前消息。
if inbox: 有消息才注入,避免添加空 user 消息。
inbox_text = "\n".join( 将多条消息格式化为单个多行字符串。
f"From {m['from']}: {m['content'][:200]}" for m in inbox) 生成器表达式(括号内):每条消息格式化为 "From alice: ..." 形式。content[:200] 截断长消息。
history.append({"role": "user", 以 user 角色注入,使 Lead 在下一轮对话中能看到 Teammate 的结果。
"content": f"[Inbox]\n{inbox_text}"}) [Inbox] 前缀帮助 LLM 识别这是来自收件箱的内容(而非用户直接输入)。\n 换行后跟格式化的消息列表。
print(f"\n\033[33m[Inbox: {len(inbox)} messages injected]\033[0m") 黄色日志,告知用户有 N 条收件箱消息被注入。len(inbox) 为消息数量。
完整 Lead-Teammate 协作流程# 用户输入: "派一个 alice 帮我写测试文件" 1. user 输入 → history.append(user: "派一个 alice...") 2. agent_loop: LLM → spawn_teammate(name="alice", role="tester", prompt="写 test_hello.py") execute_tool → spawn_teammate_thread("alice", ...) → daemon thread 启动,alice 开始工作 3. 主线程 agent_loop 继续,LLM 输出: "已派 alice,她正在写测试文件" 4. agent_loop 结束(LLM stop_reason = end_turn) 5. main loop: BUS.read_inbox("lead") → [] (alice 还没完成) print() 6. 用户 (5 秒后输入): "check" 或者 alice 在后台完成了: BUS.send("alice", "lead", "test_hello.py 写完了,42 个测试全通过", "result") 7. 下一次 agent_loop 前: BUS.read_inbox("lead") → [{"from":"alice","content":"test_hello.py 写完了..."}] history.append(user: "[Inbox]\nFrom alice: test_hello.py 写完了...") print("[Inbox: 1 messages injected]") 8. 下一轮 agent_loop → LLM 看到 inbox 消息,告知用户 alice 的结果
14. S15 相对 S14 移除的内容(简化)

注意

S15 不仅新增了 MessageBus 和团队工具,还移除了 S14 中的部分结构,简化了架构(回到 S13 风格的 main loop)。

S14 有、S15 无
agent_lock = threading.Lock() S14 用 agent_lock 防止 queue_processor 与用户输入并发执行 agent_loop。S15 移除 queue_processor_loop,无需此锁。
session_history: list = [] S14 提升为模块全局变量(供 queue_processor 访问)。S15 回归 S13 模式:history 是 main loop 内的局部变量。
session_context = update_context({}, []) 同上,S14 全局、S15 局部。
def queue_processor_loop(): S14 的 queue_processor 负责在 agent 空闲时自动处理 cron 任务。S15 没有此函数(cron_queue 仍然存在,由 agent_loop 在用户输入时消费)。
def run_agent_turn_locked(user_query=None): S14 为共享 agent_lock 场景封装的辅助函数。S15 移除。
def print_latest_assistant_text(messages): S14 提取的辅助函数。S15 回归 S13 的内联打印方式(main loop 中直接 for block in history[-1]["content"])。
def agent_loop(...) -> dict: S14 返回 context dict。S15 回归 None 返回(无返回值),context 在 main loop 中单独 update_context() 更新。
S14 main loop(持 agent_lock)
with agent_lock: run_agent_turn_locked(query) # queue_processor 也会持 agent_lock # 调用 run_agent_turn_locked()(无 user_query)
S15 main loop(无锁,简洁)
history.append({"role":"user","content":query}) agent_loop(history, context) context = update_context(context, history) # 打印回复 inbox = BUS.read_inbox("lead") if inbox: history.append({"role":"user","content":"[Inbox]\n..."})

为何移除 queue_processor?

S15 的教学重点是多 Agent 协作,而非自动 cron 唤醒。queue_processor 增加了架构复杂度(需要 agent_lock、全局 session_history),这与 S15 新增的 MessageBus + teammate 复杂度叠加会分散学习焦点。教学版 Teammate 限制 10 轮也是同理——简化以聚焦核心概念。