S16: Team Protocols — 代码解析
相对于 S15 新增:request-response 协议 + request_id 关联 + 状态机 + Teammate Idle Loop
1. 整体功能概览
01
解决什么问题
S15 的 teammate 只能发消息,Lead 无法知道消息是否被响应、响应是否匹配请求。S16 引入 请求-响应协议:每条协议消息携带 request_id,响应端必须回传同一 ID,Lead 通过 ID 精确关联。
02
系统中的角色
在 Agent Teams 架构中,S16 负责「控制层」:shutdown 协议、plan 审批协议均以状态机形式追踪(pending → approved/rejected)。未经 Lead 批准的 plan 不会触发后续工具调用。
03
新增模块列表
ProtocolState 数据类、pending_requests 字典、dispatch_message 路由逻辑、consume_lead_inbox 统一消费器、3 个 Lead 工具 + 1 个 teammate 工具 submit_plan。Teammate 由「10轮限制」升级为「空闲等待循环」。
2. ProtocolState 数据类 & pending_requests
新增S15 无此数据类。S16 用它追踪每一个 in-flight 协议请求。
| @dataclass |
Python 装饰器,自动生成 __init__ / __repr__ / __eq__ |
| class ProtocolState: |
定义协议状态数据结构;dataclass 不需要手动写 self.xxx = xxx |
| request_id: str |
类型注解语法;dataclass 字段;唯一标识本次请求 |
| type: str # "shutdown" | "plan_approval" |
枚举语义用注释说明,Python 本身不强制;区分协议类型 |
| sender: str |
发起方 agent 名称(通常 "lead") |
| target: str |
接收方 agent 名称(teammate 名称) |
| status: str # pending | approved | rejected |
状态机三态;初始 pending,由 match_response() 更新 |
| payload: str # plan text or shutdown reason |
携带的内容(plan 文本或 shutdown 原因) |
| created_at: float = field(default_factory=time.time) |
field() 允许设置默认值工厂;time.time() 每次实例化时调用 |
| pending_requests: dict[str, ProtocolState] = {} |
全局注册表;key = request_id;value = 对应的 ProtocolState 实例 |
| def new_request_id() -> str: |
返回类型注解 -> str;生成不碰撞的 ID |
| return f"req_{random.randint(0, 999999):06d}" |
f-string 格式化;:06d 补零至6位;e.g. "req_042871" |
示例 — 创建 shutdown 请求:
输入: run_request_shutdown("alice")
req_id = "req_042871"
pending_requests["req_042871"] = ProtocolState(
request_id="req_042871", type="shutdown",
sender="lead", target="alice",
status="pending", payload=""
)
→ BUS.send("lead","alice","Please shut down.","shutdown_request",{"request_id":"req_042871"})
3. match_response & consume_lead_inbox
match_response — 状态机转换
| def match_response(response_type: str,
request_id: str, approve: bool): |
接收响应类型、ID、是否批准;无返回值(None) |
| state = pending_requests.get(request_id) |
dict.get() 不存在返回 None;避免 KeyError |
| if not state: |
None 为 falsy;处理未知 request_id |
| if state.type == "shutdown" and
response_type != "shutdown_response": |
类型校验:shutdown 请求只接受 shutdown_response,防止跨协议混淆 |
| if state.status != "pending": |
幂等保护:已完成的请求忽略重复响应 |
| state.status = "approved" if approve else "rejected" |
三元表达式;直接修改 dataclass 实例属性(就地 mutation) |
consume_lead_inbox — 统一消费器
S16 修复S15 中 check_inbox 和主循环各自调用 BUS.read_inbox,消息可能被一方消费后另一方漏读协议响应。S16 将两者统一到一个函数。
| def consume_lead_inbox(route_protocol: bool = True)
-> list[dict]: |
默认参数 = True;返回所有消息供调用方展示 |
| msgs = BUS.read_inbox("lead") |
读取并消费 lead 的 .jsonl 信箱(读后删除) |
| if route_protocol: |
允许调用方关闭路由(测试用途) |
| for msg in msgs: |
遍历所有消息,只对协议响应(_response 后缀)执行 match_response |
| if req_id and msg_type.endswith("_response"): |
str.endswith() 字符串后缀检测;区分普通 message 和协议响应 |
| return msgs |
返回所有消息(含协议消息)供 check_inbox 工具格式化展示 |
示例 — inbox 路由:
incoming msg: {"type":"shutdown_response","metadata":{"request_id":"req_042871","approve":true}}
→ req_id="req_042871", msg_type="shutdown_response" → endswith("_response") ✓
→ match_response("shutdown_response","req_042871", True)
→ pending_requests["req_042871"].status = "approved"
4. handle_inbox_message — Teammate 消息分发器
新增S15 的 teammate 对所有收到的消息一视同仁注入 history。S16 先按 type 分发,再决定是否继续。
| def handle_inbox_message(name: str, msg: dict,
messages: list) -> bool: |
返回 bool:True = 应停止 teammate 循环(收到 shutdown) |
| msg_type = msg.get("type", "message") |
dict.get() 取 type 字段,默认 "message" |
| meta = msg.get("metadata", {}) |
嵌套字典取值;metadata 包含 request_id、approve 等协议元数据 |
| if msg_type == "shutdown_request": |
精确匹配协议 type;不使用 startswith 避免误匹配 |
| BUS.send(name, "lead", "Shutting down gracefully.",
"shutdown_response",
{"request_id": req_id, "approve": True}) |
回传 shutdown_response,携带同一 request_id + approve=True;Lead 的 match_response 会据此更新状态 |
| return True # stop the loop |
信号:调用方应退出 while 循环 |
| if msg_type == "plan_approval_response": |
Lead 回复 plan 审批结果时触发 |
| approve = meta.get("approve", False) |
从 metadata 读取审批结果;默认 False(拒绝更安全) |
| return False # continue |
非协议消息或 plan 回复:继续循环,LLM 继续工作 |
5. Teammate Idle Loop — 从「10轮限制」到「空闲等待」
重大变更S15 teammate 最多执行 10 轮后退出。S16 升级为真正的空闲等待:完成任务后不退出,而是轮询 inbox 等待新指令。
S15 旧版(对比)
for _ in range(10):
inbox = BUS.read_inbox(name)
...
response = client.messages.create(...)
if response.stop_reason != "tool_use":
break
# 10轮后直接发 summary 退出
S16 新版(Idle Loop)
while not shutdown_requested:
inbox = BUS.read_inbox(name)
...
response = client.messages.create(...)
if response.stop_reason != "tool_use":
# IDLE: 轮询 inbox
while not shutdown_requested:
time.sleep(1)
inbox = BUS.read_inbox(name)
if inbox: break
| shutdown_requested = False |
局部变量作为外层 while 的退出信号 |
| while not shutdown_requested: |
外层 WORK loop;负条件更直观("没被关闭则继续") |
| non_protocol = [] |
过滤出非协议消息,专门注入 history |
| for msg in inbox:
if msg.get("type") in ("shutdown_request",
"plan_approval_response"):
should_stop = handle_inbox_message(...)
else:
non_protocol.append(msg) |
逐条分类:协议消息走 dispatch;普通消息存入 non_protocol 列表 |
| # Idle: wait for inbox messages instead of exiting
# Real CC sends idle_notification to Lead here
while not shutdown_requested:
time.sleep(1)
inbox = BUS.read_inbox(name)
if not inbox:
continue |
内层 IDLE loop;1s 轮询;无消息则 continue(节省 CPU) |
7. _teammate_submit_plan — Teammate 提交 Plan
新增工具S15 无此工具。Teammate 通过 submit_plan 向 Lead 提交工作计划,等待审批后再执行。
| def _teammate_submit_plan(from_name: str, plan: str) -> str: |
由 teammate 的 sub_handlers["submit_plan"] 调用;私有函数(_前缀) |
| req_id = new_request_id() |
生成本次 plan 请求的唯一 ID |
| pending_requests[req_id] = ProtocolState(
request_id=req_id, type="plan_approval",
sender=from_name, target="lead",
status="pending", payload=plan) |
注册协议状态;payload = plan 文本,供 Lead 查看 |
| BUS.send(from_name, "lead", plan,
"plan_approval_request",
{"request_id": req_id}) |
发给 Lead;content = plan 文本;type = plan_approval_request |
| return f"Plan submitted ({req_id}). Waiting for approval..." |
f-string;返回值包含 req_id 供 teammate 的 LLM 在 history 中追踪 |
注意(教学版局限性):
提交后 teammate 线程仍可继续调用其他工具 — 这是教学版的简化。
真实 CC 中,submit_plan 应阻塞 tool dispatch,直到 plan_approval_response 到达才解锁。
代码注释原文:"Code-level tool gating would require blocking the teammate's tool dispatch until approval arrives."
8. 完整协议流程图
Shutdown 协议流程:
Lead MessageBus Teammate (alice)
──── ────────── ───────────────
run_request_shutdown("alice")
→ pending_requests["req_042871"] = {status:"pending"}
→ BUS.send("lead","alice","shutdown_request",{request_id})
→ alice.jsonl ←────────────────
handle_inbox_message()
msg_type == "shutdown_request"
BUS.send(alice,"lead","shutdown_response",{approve:True})
→ lead.jsonl ──────────────────
consume_lead_inbox()
→ match_response("shutdown_response","req_042871", True)
→ pending_requests["req_042871"].status = "approved"
→ shutdown_requested = True (in teammate thread)
Plan 审批协议流程:
Lead Teammate (alice)
──── ───────────────
run_request_plan("alice","优化 DB")
→ BUS.send("lead","alice","Please submit a plan...","message")
sub_tools "submit_plan" 调用
_teammate_submit_plan(name, plan)
→ pending_requests[req_id] = {type:"plan_approval"}
→ BUS.send(alice,"lead",plan,"plan_approval_request")
consume_lead_inbox() → check_inbox 工具展示给 Lead
run_review_plan(req_id, approve=True, feedback="LGTM")
→ BUS.send("lead","alice","LGTM","plan_approval_response",{approve:True})
handle_inbox_message()
→ messages.append("[Plan approved] Proceed")