S16: Team Protocols — 代码解析

相对于 S15 新增:request-response 协议 + request_id 关联 + 状态机 + Teammate Idle Loop


1. 整体功能概览

01
解决什么问题

S15 的 teammate 只能发消息,Lead 无法知道消息是否被响应、响应是否匹配请求。S16 引入 请求-响应协议:每条协议消息携带 request_id,响应端必须回传同一 ID,Lead 通过 ID 精确关联。

02
系统中的角色

在 Agent Teams 架构中,S16 负责「控制层」:shutdown 协议、plan 审批协议均以状态机形式追踪(pending → approved/rejected)。未经 Lead 批准的 plan 不会触发后续工具调用。

03
新增模块列表

ProtocolState 数据类、pending_requests 字典、dispatch_message 路由逻辑、consume_lead_inbox 统一消费器、3 个 Lead 工具 + 1 个 teammate 工具 submit_plan。Teammate 由「10轮限制」升级为「空闲等待循环」。


2. ProtocolState 数据类 & pending_requests

新增S15 无此数据类。S16 用它追踪每一个 in-flight 协议请求。

@dataclass Python 装饰器,自动生成 __init__ / __repr__ / __eq__
class ProtocolState: 定义协议状态数据结构;dataclass 不需要手动写 self.xxx = xxx
request_id: str 类型注解语法;dataclass 字段;唯一标识本次请求
type: str # "shutdown" | "plan_approval" 枚举语义用注释说明,Python 本身不强制;区分协议类型
sender: str 发起方 agent 名称(通常 "lead")
target: str 接收方 agent 名称(teammate 名称)
status: str # pending | approved | rejected 状态机三态;初始 pending,由 match_response() 更新
payload: str # plan text or shutdown reason 携带的内容(plan 文本或 shutdown 原因)
created_at: float = field(default_factory=time.time) field() 允许设置默认值工厂;time.time() 每次实例化时调用
pending_requests: dict[str, ProtocolState] = {} 全局注册表;key = request_id;value = 对应的 ProtocolState 实例
def new_request_id() -> str: 返回类型注解 -> str;生成不碰撞的 ID
return f"req_{random.randint(0, 999999):06d}" f-string 格式化;:06d 补零至6位;e.g. "req_042871"
示例 — 创建 shutdown 请求:
输入: run_request_shutdown("alice")

req_id = "req_042871"
pending_requests["req_042871"] = ProtocolState(
request_id="req_042871", type="shutdown",
sender="lead", target="alice",
status="pending", payload=""
)
→ BUS.send("lead","alice","Please shut down.","shutdown_request",{"request_id":"req_042871"})

3. match_response & consume_lead_inbox

match_response — 状态机转换

def match_response(response_type: str, request_id: str, approve: bool): 接收响应类型、ID、是否批准;无返回值(None)
state = pending_requests.get(request_id) dict.get() 不存在返回 None;避免 KeyError
if not state: None 为 falsy;处理未知 request_id
if state.type == "shutdown" and response_type != "shutdown_response": 类型校验:shutdown 请求只接受 shutdown_response,防止跨协议混淆
if state.status != "pending": 幂等保护:已完成的请求忽略重复响应
state.status = "approved" if approve else "rejected" 三元表达式;直接修改 dataclass 实例属性(就地 mutation)

consume_lead_inbox — 统一消费器

S16 修复S15 中 check_inbox 和主循环各自调用 BUS.read_inbox,消息可能被一方消费后另一方漏读协议响应。S16 将两者统一到一个函数。

def consume_lead_inbox(route_protocol: bool = True) -> list[dict]: 默认参数 = True;返回所有消息供调用方展示
msgs = BUS.read_inbox("lead") 读取并消费 lead 的 .jsonl 信箱(读后删除)
if route_protocol: 允许调用方关闭路由(测试用途)
for msg in msgs: 遍历所有消息,只对协议响应(_response 后缀)执行 match_response
if req_id and msg_type.endswith("_response"): str.endswith() 字符串后缀检测;区分普通 message 和协议响应
return msgs 返回所有消息(含协议消息)供 check_inbox 工具格式化展示
示例 — inbox 路由:
incoming msg: {"type":"shutdown_response","metadata":{"request_id":"req_042871","approve":true}}
→ req_id="req_042871", msg_type="shutdown_response" → endswith("_response") ✓
→ match_response("shutdown_response","req_042871", True)
→ pending_requests["req_042871"].status = "approved"

4. handle_inbox_message — Teammate 消息分发器

新增S15 的 teammate 对所有收到的消息一视同仁注入 history。S16 先按 type 分发,再决定是否继续。

def handle_inbox_message(name: str, msg: dict, messages: list) -> bool: 返回 bool:True = 应停止 teammate 循环(收到 shutdown)
msg_type = msg.get("type", "message") dict.get() 取 type 字段,默认 "message"
meta = msg.get("metadata", {}) 嵌套字典取值;metadata 包含 request_id、approve 等协议元数据
if msg_type == "shutdown_request": 精确匹配协议 type;不使用 startswith 避免误匹配
BUS.send(name, "lead", "Shutting down gracefully.", "shutdown_response", {"request_id": req_id, "approve": True}) 回传 shutdown_response,携带同一 request_id + approve=True;Lead 的 match_response 会据此更新状态
return True # stop the loop 信号:调用方应退出 while 循环
if msg_type == "plan_approval_response": Lead 回复 plan 审批结果时触发
approve = meta.get("approve", False) 从 metadata 读取审批结果;默认 False(拒绝更安全)
return False # continue 非协议消息或 plan 回复:继续循环,LLM 继续工作

5. Teammate Idle Loop — 从「10轮限制」到「空闲等待」

重大变更S15 teammate 最多执行 10 轮后退出。S16 升级为真正的空闲等待:完成任务后不退出,而是轮询 inbox 等待新指令。

S15 旧版(对比)

# S15: 固定 10 轮上限
for _ in range(10):
  inbox = BUS.read_inbox(name)
  ...
  response = client.messages.create(...)
  if response.stop_reason != "tool_use":
    break
# 10轮后直接发 summary 退出

S16 新版(Idle Loop)

# S16: 完成后进入 idle 等待
while not shutdown_requested:
  inbox = BUS.read_inbox(name)
  ...
  response = client.messages.create(...)
  if response.stop_reason != "tool_use":
    # IDLE: 轮询 inbox
    while not shutdown_requested:
      time.sleep(1)
      inbox = BUS.read_inbox(name)
      if inbox: break # 有新消息,回 WORK
shutdown_requested = False 局部变量作为外层 while 的退出信号
while not shutdown_requested: 外层 WORK loop;负条件更直观("没被关闭则继续")
non_protocol = [] 过滤出非协议消息,专门注入 history
for msg in inbox: if msg.get("type") in ("shutdown_request", "plan_approval_response"): should_stop = handle_inbox_message(...) else: non_protocol.append(msg) 逐条分类:协议消息走 dispatch;普通消息存入 non_protocol 列表
# Idle: wait for inbox messages instead of exiting # Real CC sends idle_notification to Lead here while not shutdown_requested: time.sleep(1) inbox = BUS.read_inbox(name) if not inbox: continue 内层 IDLE loop;1s 轮询;无消息则 continue(节省 CPU)

6. 三个新 Lead 协议工具

run_request_shutdown

def run_request_shutdown(teammate: str) -> str: Lead 工具函数;参数 = teammate 名称
req_id = new_request_id() 生成唯一 ID;每次调用生成新 ID
pending_requests[req_id] = ProtocolState( ...type="shutdown"...status="pending"...) 注册到全局字典;状态机起点为 pending
BUS.send("lead", teammate, ..., "shutdown_request", {"request_id": req_id}) 通过 MessageBus 发送;metadata 携带 request_id
输入: run_request_shutdown("alice")
pending_requests["req_042871"] = {status:"pending"}
BUS: lead→alice (shutdown_request)
返回: "Shutdown request sent to alice (req: req_042871)"

run_request_plan

def run_request_plan(teammate: str, task: str) -> str: Lead 请求 teammate 提交 plan;注意本函数不创建 ProtocolState,只发普通消息
BUS.send("lead", teammate, f"Please submit a plan for: {task}", "message") type = "message"(非协议类型);teammate 收到后调用 submit_plan
输入: run_request_plan("alice", "优化 DB 查询")
BUS: lead→alice "Please submit a plan for: 优化 DB 查询" (message)
返回: "Asked alice to submit a plan"

run_review_plan

def run_review_plan(request_id: str, approve: bool, feedback: str = "") -> str: Lead 审批 plan;approve = True/False;feedback 可选
state = pending_requests.get(request_id) 从全局字典查找;不存在返回 None → 错误提示
state.status = "approved" if approve else "rejected" 状态机转换;三元表达式
BUS.send("lead", state.sender, ..., "plan_approval_response", {"request_id": request_id, "approve": approve}) 回传给 teammate;type = plan_approval_response;metadata 含结果

7. _teammate_submit_plan — Teammate 提交 Plan

新增工具S15 无此工具。Teammate 通过 submit_plan 向 Lead 提交工作计划,等待审批后再执行。

def _teammate_submit_plan(from_name: str, plan: str) -> str: 由 teammate 的 sub_handlers["submit_plan"] 调用;私有函数(_前缀)
req_id = new_request_id() 生成本次 plan 请求的唯一 ID
pending_requests[req_id] = ProtocolState( request_id=req_id, type="plan_approval", sender=from_name, target="lead", status="pending", payload=plan) 注册协议状态;payload = plan 文本,供 Lead 查看
BUS.send(from_name, "lead", plan, "plan_approval_request", {"request_id": req_id}) 发给 Lead;content = plan 文本;type = plan_approval_request
return f"Plan submitted ({req_id}). Waiting for approval..." f-string;返回值包含 req_id 供 teammate 的 LLM 在 history 中追踪
注意(教学版局限性):
提交后 teammate 线程仍可继续调用其他工具 — 这是教学版的简化。
真实 CC 中,submit_plan 应阻塞 tool dispatch,直到 plan_approval_response 到达才解锁。
代码注释原文:"Code-level tool gating would require blocking the teammate's tool dispatch until approval arrives."

8. 完整协议流程图

Shutdown 协议流程: Lead MessageBus Teammate (alice) ──── ────────── ─────────────── run_request_shutdown("alice") → pending_requests["req_042871"] = {status:"pending"} → BUS.send("lead","alice","shutdown_request",{request_id}) → alice.jsonl ←──────────────── handle_inbox_message() msg_type == "shutdown_request" BUS.send(alice,"lead","shutdown_response",{approve:True}) → lead.jsonl ────────────────── consume_lead_inbox() → match_response("shutdown_response","req_042871", True) → pending_requests["req_042871"].status = "approved" → shutdown_requested = True (in teammate thread) Plan 审批协议流程: Lead Teammate (alice) ──── ─────────────── run_request_plan("alice","优化 DB") → BUS.send("lead","alice","Please submit a plan...","message") sub_tools "submit_plan" 调用 _teammate_submit_plan(name, plan) → pending_requests[req_id] = {type:"plan_approval"} → BUS.send(alice,"lead",plan,"plan_approval_request") consume_lead_inbox() → check_inbox 工具展示给 Lead run_review_plan(req_id, approve=True, feedback="LGTM") → BUS.send("lead","alice","LGTM","plan_approval_response",{approve:True}) handle_inbox_message() → messages.append("[Plan approved] Proceed")