1. 整体功能 — 四层调度架构

问题背景

S13 的后台任务由用户或 LLM 手动触发。S14 引入"时间触发":用类 cron 的方式让 agent 在特定时间自动唤醒执行任务,无需人工干预。

解决方案

四层架构:①调度线程按秒轮询时间 → ②匹配时将 CronJob 写入 cron_queue → ③queue processor 检测到队列有工作时唤醒 agent → ④agent_loop 消费队列注入对话。

在系统中的角色

S14 为整个 agent 系统增加了"时间感知"能力。调度线程与 agent loop 完全解耦,通过队列通信。持久化存储保证重启后任务不丢失。

1
Scheduler 线程
cron_scheduler_loop():每秒轮询,检查 scheduled_jobs 中所有 CronJob 是否匹配当前时间,匹配则写入 cron_queue。
2
Queue(解耦层)
cron_queue:list 充当轻量级队列,调度线程写入、agent_loop 读出。调度线程不直接调用 agent,避免循环依赖。
3
Queue Processor 线程
queue_processor_loop():轮询队列是否有工作 + agent 是否空闲,两者同时满足时获取 agent_lock 并唤醒 agent。
2. 新增 import
from datetime import datetime datetime.datetime:用于获取当前时间 datetime.now() 并提取 minute、hour、day、month、weekday 等字段,供 cron 表达式匹配。S13 无此 import。
3. CronJob 数据类(S14 新增)

设计意图

用 Python dataclass 定义结构化数据,自动生成 __init__、__repr__、__eq__ 等方法。与 S12 的 Task 相同模式,便于 asdict() 序列化到 JSON 文件实现持久化。

@dataclass dataclass 装饰器(decorator)。在类定义时自动生成样板代码。需要从 dataclasses 模块导入(S12 已导入,S14 继承)。
class CronJob: 新类定义。不继承任何父类(隐式继承 object)。
id: str 字段类型注解,格式如 cron_001234。dataclass 将此注解转为 __init__ 参数。
cron: str # "0 9 * * *" 5 字段 cron 表达式字符串。注释给出典型示例:每天 9:00 触发。
prompt: str # message to inject when fired 触发时注入对话的消息文本。agent 收到此消息后开始执行相应任务。
recurring: bool # True = recurring, False = one-shot 布尔标志。True:每次时间匹配都触发(反复执行)。False:触发一次后自动从 scheduled_jobs 删除。
durable: bool # True = persist to disk 持久化标志。True:写入 .scheduled_tasks.json,进程重启后恢复。False:仅在内存中,重启丢失。
CronJob 实例示例CronJob( id="cron_042157", cron="0 9 * * 1-5", # 工作日每天 9:00 prompt="Run daily standup: check task status and update progress", recurring=True, durable=True ) # asdict() 序列化为: { "id": "cron_042157", "cron": "0 9 * * 1-5", "prompt": "Run daily standup...", "recurring": true, "durable": true }
4. 全局状态变量(S14 新增)
DURABLE_PATH = WORKDIR / ".scheduled_tasks.json" 持久化文件路径常量。Path / str 使用 pathlib 的 / 运算符重载拼接路径。约定用 . 开头(隐藏文件)避免与用户文件混淆。
scheduled_jobs: dict[str, CronJob] = {} 运行中的 cron 任务注册表。键为 job_id,值为 CronJob 对象。调度线程每秒遍历此 dict。
cron_queue: list[CronJob] = [] 触发队列。调度线程写入(append),agent_loop 消费(consume_cron_queue 清空)。用 list 而非 queue.Queue 因为 GIL 保护下 list.append 和 list.clear 是原子的(配合 lock 使用更安全)。
cron_lock = threading.Lock() 保护 scheduled_jobs 和 cron_queue 的互斥锁。调度线程、agent_loop、cron 工具函数都会读写这两个结构,必须加锁。
agent_lock = threading.Lock() 保护 agent_loop 不被并发调用的互斥锁。queue_processor_loop 使用 acquire(blocking=False) 非阻塞尝试获锁,以判断 agent 是否空闲。
_last_fired: dict[str, str] = {} 防重复触发 dict。键为 job_id,值为 "YYYY-MM-DD HH:MM" 字符串。若同一任务在同一分钟内调度线程多次轮询到,只触发第一次。日期感知防止跨天重置问题。
5. _cron_field_matches() — 单字段匹配

功能

这是 cron 解析引擎的最小单元:判断一个 cron 字段表达式是否匹配一个整数值。支持四种语法:*(通配)、*/n(步长)、a,b,c(列表)、a-b(范围)、n(精确值)。

def _cron_field_matches(field: str, value: int) -> bool: 前缀 _ 表示模块私有函数,不供外部直接调用。field 是 cron 字段字符串,value 是当前时间对应的整数(如 minute=30)。
if field == "*": 通配符,匹配所有值。最常见情况,优先检查(早期返回优化)。
return True * 始终匹配,直接返回。
if field.startswith("*/"): str.startswith(prefix):检测步长语法,如 */5(每 5 分钟)、*/2(每 2 小时)。
step = int(field[2:]) field[2:]:切片从索引 2 开始到末尾(跳过 "*/")。int():字符串转整数,若非数字会抛 ValueError(此处假设格式已验证)。
return step > 0 and value % step == 0 value % step == 0:取模运算(modulo),value 是 step 的整数倍时为真。step > 0:防止除零错误。短路运算(and):左侧为假则不计算右侧。
if "," in field: 逗号检测,如 1,3,5(周一/三/五)。
return any(_cron_field_matches(f.strip(), value) 递归调用:将逗号分隔的各子字段逐一检查。f.strip() 去除可能的空格。any() 有一个子字段匹配即为真(OR 语义)。
for f in field.split(",")) 生成器表达式(括号内),惰性求值。field.split(",") 返回子字段列表。
if "-" in field: 范围检测,如 1-5(周一到周五)。
lo, hi = field.split("-", 1) 多重赋值(tuple unpacking)。split("-", 1) 最多分割 1 次(防止负数或复合情况),返回 2 个元素列表直接解包到 lo、hi。
return int(lo) <= value <= int(hi) Python 支持链式比较(chained comparison)。等价于 int(lo) <= value and value <= int(hi),更简洁。
return value == int(field) 精确值匹配(fallback)。如字段 "30" 匹配 value==30。
匹配示例_cron_field_matches("*", 30) → True # 通配 _cron_field_matches("*/5", 30) → True # 30 % 5 == 0 _cron_field_matches("*/5", 31) → False # 31 % 5 != 0 _cron_field_matches("1,3,5", 3) → True # 逗号列表,递归匹配 "3" _cron_field_matches("1-5", 3) → True # 1 <= 3 <= 5 _cron_field_matches("1-5", 6) → False # 6 > 5 _cron_field_matches("30", 30) → True # 精确匹配
6. cron_matches() — 完整 cron 表达式匹配

功能

将 5 个 cron 字段组合匹配,实现标准 cron 语义:minute、hour、month 必须全匹配;DOM(月中某天)和 DOW(周中某天)若同时约束则用 OR(任一匹配即可)。

def cron_matches(cron_expr: str, dt: datetime) -> bool: dt: datetime 类型注解引用了 datetime 类。在 S14 中 from datetime import datetime,因此可直接用 datetime 而非 datetime.datetime。
fields = cron_expr.strip().split() str.strip() 去首尾空白,str.split()(无参数)按任意空白符分割。结果为 list,如 ["0","9","*","*","1-5"]
if len(fields) != 5: 快速失败(fail fast):标准 cron 必须是 5 字段,不满足直接返回 False。
return False 早期返回,避免后续 IndexError。
minute, hour, dom, month, dow = fields 序列解包(sequence unpacking):将 5 元素列表一次性赋值给 5 个变量。比 fields[0] 等索引访问更可读。
dow_val = (dt.weekday() + 1) % 7 星期转换:Python datetime.weekday() 返回 0(Mon)…6(Sun)。标准 cron 约定 0=Sunday。加 1 再模 7:Mon→1, Tue→2, …, Sun→0。
m = _cron_field_matches(minute, dt.minute) 对 minute 字段调用单字段匹配函数。dt.minute 是 0-59 的整数。
h = _cron_field_matches(hour, dt.hour) 对 hour 字段匹配,dt.hour 是 0-23。
dom_ok = _cron_field_matches(dom, dt.day) day-of-month,dt.day 是 1-31。变量名用 _ok 后缀,与 m/h 区分(DOM/DOW 有特殊 OR 语义)。
month_ok = _cron_field_matches(month, dt.month) dt.month 是 1-12。
dow_ok = _cron_field_matches(dow, dow_val) 使用转换后的 dow_val(0-6,Sunday=0)。
if not (m and h and month_ok): 分钟、小时、月份必须全部匹配(AND 语义)。括号内是逻辑 AND 表达式。not (...) 若任一不匹配则整体为 True,提前返回 False。
return False 早期返回——大多数时刻分钟/小时就不匹配,避免继续计算 DOM/DOW。
dom_unconstrained = dom == "*" 判断 DOM 字段是否是通配符(未约束)。Python 比较运算符返回 bool。
dow_unconstrained = dow == "*" 同理判断 DOW 字段。
if dom_unconstrained and dow_unconstrained: 两者都是 * → 日期不约束,直接 True(minute/hour/month 已全匹配)。
return True 最常见情况(如 0 9 * * *),提前返回。
if dom_unconstrained: DOM 未约束但 DOW 有约束 → 只看 DOW。
return dow_ok 0 9 * * 1-5(工作日 9 点):只需 DOW 匹配。
if dow_unconstrained: DOW 未约束但 DOM 有约束 → 只看 DOM。
return dom_ok 0 9 1 * *(每月 1 号 9 点):只需 DOM 匹配。
return dom_ok or dow_ok OR 语义:DOM 和 DOW 同时约束时,任一匹配即触发。这是标准 cron 的 POSIX 语义,与 AND 相比更直觉(如 0 9 1 * 1 = 每月 1 号 OR 每周一)。
典型 cron 表达式示例cron_matches("0 9 * * *", dt(09:00 周三)) → True # 每天 9:00 cron_matches("*/5 * * * *", dt(09:30 任何)) → True # 每 5 分钟 cron_matches("0 9 * * 1-5", dt(09:00 周六)) → False # 周末不触发 cron_matches("0 9 * * 1-5", dt(09:00 周一)) → True # 工作日触发 cron_matches("0 9 1 * 1", dt(09:00 月1日,周三)) → True # DOM=1 匹配,OR
7. validate_cron() 与 _validate_cron_field() — 表达式验证

功能

在注册 cron job 前验证表达式合法性,返回错误信息字符串(None 表示合法)。与 cron_matches 解耦,专门处理边界检查。

_validate_cron_field() 关键语法
def _validate_cron_field(field, lo, hi) -> str | None: Union 类型str | None 是 Python 3.10+ 新语法,等价于旧写法 Optional[str]。None 表示无错误,str 是错误信息。
step_str = field[2:] 切片提取步长字符串,不含 "*/"。
if not step_str.isdigit(): str.isdigit():判断字符串是否全为数字字符(0-9)。注意不接受负数和小数点。
err = _validate_cron_field(part.strip(), lo, hi) 递归验证:逗号列表中每个子字段独立验证。
if err: return err 单行 if,发现首个错误立即返回(fail fast)。不收集所有错误。
if a < lo or a > hi or b < lo or b > hi: 范围值越界检查,使用传入的 lo/hi 边界。
if not field.isdigit(): fallback:不是任何已知格式,则验证是否为纯数字。
validate_cron() 主函数
def validate_cron(cron_expr: str) -> str | None: 公开接口(无下划线前缀)。返回 None 表示合法,返回 str 是错误描述。
bounds = [(0, 59), (0, 23), (1, 31), (1, 12), (0, 6)] 元组列表:对应 minute、hour、DOM、month、DOW 的合法范围。DOM 从 1 开始(无第 0 天),month 同理。
names = ["minute", "hour", "day-of-month", ...] 与 bounds 等长的名称列表,用于构造可读错误消息。
for i, (field, (lo, hi), name) in enumerate( zip + enumerate + 嵌套解包zip(fields, bounds, names) 产生三元组,外层 enumerate 加序号(虽然此处未用 i),内层 (lo, hi) 解包 bounds 元素。Python 允许此类多层解包。
err = _validate_cron_field(field, lo, hi) 对每个字段调用私有验证函数,传入对应范围。
if err: err 为 None(无错误)时 falsy,有错误字符串时 truthy。
return f"{name}: {err}" 在错误前加字段名称,如 "minute: Value 60 out of bounds [0-59]"。
return None 所有字段验证通过,返回 None 表示合法。
验证示例validate_cron("0 9 * * *") → None # 合法 validate_cron("60 9 * * *") → "minute: Value 60 out of bounds [0-59]" validate_cron("0 9 * * 7") → "day-of-week: Value 7 out of bounds [0-6]" validate_cron("*/0 9 * * *") → "minute: Step must be > 0: */0" validate_cron("0 9 1-2") → "Expected 5 fields, got 3"
8. save_durable_jobs() / load_durable_jobs() — 持久化

功能

Cron job 默认持久化到 .scheduled_tasks.json。进程重启后 load_durable_jobs() 恢复所有 durable=True 的任务,保证定时任务跨会话存活。

def save_durable_jobs(): 无返回值(隐式 return None)。每次注册/取消 durable 任务后调用,全量覆写文件(非追加)。
durable = [asdict(j) for j in scheduled_jobs.values() if j.durable] 列表推导:遍历所有 job,过滤 durable=True 的,用 asdict() 转为可序列化 dict。dict.values() 返回值视图。
DURABLE_PATH.write_text(json.dumps(durable, indent=2)) json.dumps(obj, indent=2):序列化为带缩进的 JSON 字符串。Path.write_text():原子覆写文件内容(在大多数 OS 上并非真正原子,但足够简单场景使用)。
def load_durable_jobs(): 程序启动时调用一次,从磁盘恢复持久化任务。
if not DURABLE_PATH.exists(): Path.exists():检查文件是否存在。首次运行无文件则直接返回,不报错。
return 裸 return 等价于 return None,提前退出函数。
try: try/except 保护整个加载过程,防止损坏的 JSON 文件导致程序无法启动。
jobs = json.loads(DURABLE_PATH.read_text()) json.loads(str):反序列化 JSON 字符串为 Python 对象(list of dict)。
for j in jobs: 遍历每个 job dict,逐一恢复为 CronJob 对象。
job = CronJob(**j) **j:字典解包为关键字参数,等价于 CronJob(id=..., cron=..., ...)。dataclass 自动接受这些参数。
err = validate_cron(job.cron) 加载时重新验证,防止文件被手动编辑导致无效表达式。
if err: 有错误则跳过此 job,打印警告但不中止程序。
continue 跳过当前 for 循环迭代,继续下一个。
scheduled_jobs[job.id] = job 将合法 job 注册到内存字典。
valid = [j for j in jobs if j["id"] in scheduled_jobs] 用于统计成功恢复的 job 数量(用原始 dict 而非 CronJob 对象,因为 dict 有 ["id"] 访问)。
except Exception: 捕获所有异常(含 json.JSONDecodeError、KeyError 等)。裸 except 通常不推荐,但这里是兜底保护:文件损坏不应阻止程序启动。
pass 静默忽略异常,scheduled_jobs 保持空。
9. schedule_job() / cancel_job() — 任务注册与取消
schedule_job()
def schedule_job(...) -> CronJob | str: Union 返回类型:成功返回 CronJob 对象,失败返回错误字符串。调用者用 isinstance(result, str) 判断是否出错。
err = validate_cron(cron) 先验证再注册,确保调度线程不会遇到无效表达式。
if err: 验证失败则直接返回错误字符串,不创建 CronJob。
return err 返回 str 类型,与调用者约定的 isinstance 检查对应。
job = CronJob( 创建 CronJob 实例。dataclass 的 __init__ 接受所有字段。
id=f"cron_{random.randint(0, 999999):06d}", random.randint(a, b):含两端的随机整数。:06d:6 位补零格式,避免 ID 碰撞概率约 1/1000000。
with cron_lock: 写入 scheduled_jobs 时加锁,防止调度线程同时遍历时出现 RuntimeError(dict size changed during iteration)。
if durable: 只有 durable=True 的 job 才触发文件写入。持久化是可选的,节省 IO。
save_durable_jobs() 在锁外调用(文件 IO 不应持锁),但 save_durable_jobs 本身不加锁。在单线程写入场景下安全,因为 schedule_job 只被 agent_loop 主线程调用。
cancel_job()
def cancel_job(job_id: str) -> str: 始终返回 str(成功或失败消息),适合直接作为工具返回值给 LLM。
with cron_lock: 加锁删除,与调度线程的遍历互斥。
job = scheduled_jobs.pop(job_id, None) dict.pop(key, default=None):删除并返回,key 不存在时返回 None 而非抛 KeyError。
if not job: None 为 falsy。未找到时返回错误消息。
return f"Job {job_id} not found" 错误消息直接返回,供 LLM 了解取消失败原因。
if job.durable: 取消后若曾经持久化,需要同步更新文件(否则重启后幽灵任务会复活)。
save_durable_jobs() 全量覆写,将已删除的 job 排除在外。
return f"Cancelled {job_id}" 成功消息。
10. cron_scheduler_loop() — 独立调度线程(Layer 1)

功能

这是 S14 最核心的新函数。它作为 daemon 线程运行,每秒轮询一次,检查所有注册的 cron job 是否匹配当前分钟,匹配则将 job 加入 cron_queue。

def cron_scheduler_loop(): 设计为永远运行的无限循环,由 daemon 线程执行。无参数,通过全局变量读取 scheduled_jobs 和写入 cron_queue。
while True: 无限循环,daemon 线程在主进程退出时自动终止,无需手动停止机制。
time.sleep(1) 每次循环开头 sleep 1 秒,实现"每秒轮询"。sleep 放在开头(而非结尾)是为了避免程序启动瞬间就触发任务。
now = datetime.now() 获取本轮循环的当前时间。在循环内部获取以确保每次检查用的是最新时间(而非启动时的快照)。
minute_marker = now.strftime("%Y-%m-%d %H:%M") datetime.strftime(format):格式化时间为字符串。包含日期(%Y-%m-%d)是关键:纯 %H:%M 会导致每天同一时刻只触发一次(跨天后 marker 重复)。
with cron_lock: 持锁遍历 scheduled_jobs,防止并发修改(schedule/cancel 也加锁)。注意:持锁期间 cron_queue.append 也受同一锁保护。
for job in list(scheduled_jobs.values()): list(...):将 dict_values 转为列表快照。这样即使循环中 scheduled_jobs 被修改(如非 recurring job 被删除),迭代也不会报错(RuntimeError: dictionary changed size during iteration)。
try: 每个 job 独立 try/except:一个 job 的错误不影响其他 job 的调度,整个调度线程继续运行。
if cron_matches(job.cron, now): 调用 cron 匹配函数检查当前时间是否满足此 job 的 cron 表达式。
if _last_fired.get(job.id) != minute_marker: 防重触发:同一分钟内(约 60 次循环)只触发一次。若 _last_fired 中已有本分钟的 marker,则跳过。
cron_queue.append(job) 将 CronJob 对象推入触发队列。queue_processor_loop 或 agent_loop 会消费此队列。
_last_fired[job.id] = minute_marker 记录本 job 的最后触发时间标记,防止本分钟内重复触发。
if not job.recurring: one-shot job:触发后立即从 scheduled_jobs 删除。
scheduled_jobs.pop(job.id, None) 此处在持锁内 pop,安全。使用 list() 快照迭代保证此 pop 不影响当前 for 循环。
if job.durable: one-shot + durable:执行后同步删除持久化文件中的记录。
save_durable_jobs() 注意:此处在 cron_lock 内调用 save_durable_jobs()(文件 IO)。这可能短暂阻塞锁,但 1 秒的轮询间隔足够容忍。
except Exception as e: 捕获单个 job 的所有异常(含 cron 解析错误、文件 IO 错误等)。
print(f" \033[31m[cron error] {job.id}: {e}\033[0m") \033[31m:ANSI 红色。打印错误但不 re-raise,调度线程继续运行。
调度线程时序示例(每天 09:00 的 job)08:59:59 sleep(1) 09:00:00 now = datetime(2026-06-19 09:00:00) minute_marker = "2026-06-19 09:00" for job cron_042157 ("0 9 * * *"): cron_matches("0 9 * * *", 09:00) → True _last_fired.get("cron_042157") = None ≠ "2026-06-19 09:00" cron_queue.append(job) ← 触发! _last_fired["cron_042157"] = "2026-06-19 09:00" 09:00:01 sleep(1),再次检查: _last_fired["cron_042157"] = "2026-06-19 09:00" == minute_marker → 跳过(防重触发) 09:01:00 minute_marker = "2026-06-19 09:01" cron_matches("0 9 * * *", 09:01) → False(minute 1 ≠ 0)→ 跳过
11. consume_cron_queue() / has_cron_queue() — 队列消费接口
def consume_cron_queue() -> list[CronJob]: 供 agent_loop 调用(Layer 4),将 cron_queue 全部取出并清空。返回本次消费到的 CronJob 列表。
with cron_lock: 加锁保证与调度线程的 append 操作互斥。
fired = list(cron_queue) 拷贝当前队列内容(不能直接返回原 list,因为后面要 clear)。
cron_queue.clear() list.clear():原地清空列表。此操作与 list.append() 都是线程安全的(Python GIL 保护),但 cron_lock 提供更强的原子性保证。
return fired 返回拷贝的列表。即使返回后调度线程继续 append,不影响已消费的内容。
def has_cron_queue() -> bool: 供 queue_processor_loop 轮询使用,非破坏性(不消费 queue)。比 consume_cron_queue() 成本低(不拷贝列表)。
with cron_lock: 加锁读取以获得一致性视图。
return bool(cron_queue) bool(list):空列表为 False,非空为 True。
12. Cron 工具函数 — run_schedule_cron / run_list_crons / run_cancel_cron

功能

三个对 LLM 暴露的工具处理函数,作为业务逻辑层(schedule_job/cancel_job)的薄包装,统一错误格式化和返回值类型。

def run_schedule_cron(cron, prompt, recurring=True, durable=True) -> str: 参数与 TOOLS schema 对应。recurring 和 durable 有默认值,LLM 可不传。
result = schedule_job(cron, prompt, recurring, durable) 调用核心函数,结果可能是 CronJob 或 str(错误)。
if isinstance(result, str): isinstance(obj, type):运行时类型检查。result 是 str 说明 schedule_job 返回了错误信息。
return f"Error: {result}" 加 "Error: " 前缀帮助 LLM 识别失败情况。
return f"Scheduled {result.id}: '{cron}' → {prompt}" 成功消息包含 job ID(供后续 cancel 使用)和配置摘要。
def run_list_crons() -> str: 无参数。返回人类可读的 cron 列表文本。
with cron_lock: 加锁读取 scheduled_jobs,避免遍历时调度线程修改。
jobs = list(scheduled_jobs.values()) 持锁拷贝为列表,锁外构建字符串(格式化不需要锁)。
tag = "recurring" if j.recurring else "one-shot" 三元表达式:根据 recurring 布尔值选择显示文本。
dur = "durable" if j.durable else "session" 类似地,durable 标记的可读名称。
13. agent_loop() 修改 — cron 注入 + 返回 context

S13 vs S14 的差异

S14 在 agent_loop 开头新增 cron 队列消费和注入,并将返回值从 None 改为 context dict(供 queue_processor_loop 使用)。

def agent_loop(messages: list, context: dict) -> dict: 返回类型从 S13 的隐式 None 改为 dict(context)。queue_processor_loop 需要更新 session_context。
# Layer 4: consume fired cron jobs → inject as messages 注释明确标注这是四层架构的 Layer 4(消费者层)。
fired = consume_cron_queue() 在每次 LLM 调用前先消费队列。若队列为空,fired 是空列表,不注入消息。
for job in fired: 遍历本轮触发的 cron job,逐一注入对话。
messages.append({"role": "user", 以 user 角色注入,LLM 将其视为用户发来的新消息,自动触发 agent 响应。
"content": f"[Scheduled] {job.prompt}"}) [Scheduled] 前缀使 LLM 知道这是定时触发而非用户手动输入,便于 LLM 按 job.prompt 执行对应任务。
except Exception as e: S14 将异常返回从 return(无返回值)改为 return context(有返回值),与新签名一致。
return context 错误时返回当前 context,让 queue_processor_loop 可以正确更新 session_context。
if response.stop_reason != "tool_use": 与 S13 相同,LLM 停止使用工具时退出循环。
return context S14 新增:正常退出也返回 context(S13 是裸 return)。
14. queue_processor_loop() — 队列处理器(Layer 3)

功能

独立 daemon 线程,轮询检查"有 cron 工作 + agent 空闲"两个条件,同时满足时自动唤醒 agent 处理定时任务,无需用户输入。

def queue_processor_loop(): 无限循环,运行在独立 daemon 线程中(主函数启动)。
global session_context 声明修改全局 session_context(在 run_agent_turn_locked 内更新)。
while True: 无限轮询。
time.sleep(0.2) 每 200ms 检查一次队列(比调度线程的 1s 更频繁),实现近实时响应。sleep 放循环开头避免 busy-wait 消耗 CPU。
if not has_cron_queue(): 快速路径:无工作则跳过,不尝试获锁。大多数时刻走此路径。
continue 跳回 while 开头,再次 sleep。
if not agent_lock.acquire(blocking=False): 非阻塞获锁blocking=False 使 acquire 立即返回 True(获锁成功)或 False(锁被持有)。若 agent 正忙(用户在交互),不等待,下次循环再试。
continue agent 忙则跳过,200ms 后再试。
try: try/finally 确保即使 run_agent_turn_locked 内部抛异常,agent_lock 也会被释放。
if not has_cron_queue(): 二次检查(double-check):获锁后再次确认队列非空。防止在"检查到获锁"这段时间窗口内,另一个线程(或用户输入触发的 agent_loop)已经消费了队列。
continue 二次检查发现队列为空,直接 continue(finally 会释锁)。
print("\n \033[35m[queue processor] delivering scheduled work\033[0m") \033[35m:ANSI 紫色。开头 \n 是为了不与用户正在输入的提示符混在同一行。
run_agent_turn_locked() 无参数调用(无用户输入),agent_loop 将消费 cron_queue 中的任务并执行。
finally: 无论 try 块是正常完成还是异常,都执行 finally 块。
agent_lock.release() 释放 agent_lock,允许用户交互或下一轮 queue_processor 运行。
queue_processor_loop 决策树每 200ms: has_cron_queue()? ├── No → continue(快速路径,99.9% 的时刻) └── Yes → agent_lock.acquire(blocking=False)? ├── False (agent 忙) → continue(等 200ms 再试) └── True (agent 空闲) → has_cron_queue()?(二次检查) ├── No → continue(release lock via finally) └── Yes → run_agent_turn_locked() → agent_loop 消费 cron_queue → release agent_lock
15. session_history / session_context / run_agent_turn_locked() / print_latest_assistant_text()

功能

S14 将 main 函数的 history/context 局部变量提升为模块级全局变量,使 queue_processor_loop 可以访问并修改同一会话状态。run_agent_turn_locked 封装了一次 agent 调用的完整流程。

session_history: list = [] 模块级全局变量(S13 在 __main__ 中是局部变量 history)。升级为全局使 queue_processor_loop 可以向此历史添加消息。
session_context = update_context({}, []) 模块加载时初始化 context。queue_processor_loop 通过 global session_context 修改。
def print_latest_assistant_text(messages: list): 从 messages 最后一条 assistant 消息中提取并打印文本内容。独立函数供 run_agent_turn_locked 和 main loop 复用。
if isinstance(content, str): content 可能是 str(简单文本消息)或 list(含多个 block),需要两种处理路径。
if getattr(block, "type", None) == "text": getattr(obj, name, default):安全属性访问,对象无此属性时返回 default 而不抛 AttributeError。block 可能是 SDK 对象(有 .type 属性)或 dict(用 block.get)。
def run_agent_turn_locked(user_query: str | None = None): 参数可选:有用户输入时添加 user 消息,无输入时(queue_processor 调用)直接执行(消费 cron_queue)。调用者必须持有 agent_lock。
global session_context 声明修改全局 session_context。
if user_query is not None: is not None 而非 if user_query,区分"空字符串输入"和"无输入"两种情况(虽然实际上已在 main 过滤了空字符串)。
session_history.append({"role": "user", "content": user_query}) 只有有用户输入时才添加 user 消息,无输入时直接让 agent_loop 消费 cron_queue。
session_context = agent_loop(session_history, session_context) 接收 agent_loop 返回的 context(S14 新增返回值),更新 session_context。
session_context = update_context(session_context, session_history) agent_loop 结束后再次更新 context(如加载最新 memories)。
四层架构全流程示例# 09:00:00 定时触发"Run daily standup" Layer 1 (scheduler thread, 09:00:00): cron_matches("0 9 * * *") → True cron_queue.append(standup_job) Layer 2 (cron_queue): [standup_job] ← 等待消费 Layer 3 (queue_processor_loop, 09:00:00.2): has_cron_queue() → True agent_lock.acquire(blocking=False) → True(用户未在输入) run_agent_turn_locked() ← 无 user_query 参数 Layer 4 (agent_loop via run_agent_turn_locked): fired = consume_cron_queue() → [standup_job] messages.append({"role":"user", "content":"[Scheduled] Run daily standup..."}) → LLM 收到此消息,开始执行 standup 任务 → 工具调用:list_tasks, complete_task, ... → LLM 生成摘要消息 print_latest_assistant_text() agent_lock.release() ← via finally