问题背景
S13 的后台任务由用户或 LLM 手动触发。S14 引入"时间触发":用类 cron 的方式让 agent 在特定时间自动唤醒执行任务,无需人工干预。
解决方案
四层架构:①调度线程按秒轮询时间 → ②匹配时将 CronJob 写入 cron_queue → ③queue processor 检测到队列有工作时唤醒 agent → ④agent_loop 消费队列注入对话。
在系统中的角色
S14 为整个 agent 系统增加了"时间感知"能力。调度线程与 agent loop 完全解耦,通过队列通信。持久化存储保证重启后任务不丢失。
1
Scheduler 线程
cron_scheduler_loop():每秒轮询,检查 scheduled_jobs 中所有 CronJob 是否匹配当前时间,匹配则写入 cron_queue。
2
Queue(解耦层)
cron_queue:list 充当轻量级队列,调度线程写入、agent_loop 读出。调度线程不直接调用 agent,避免循环依赖。
3
Queue Processor 线程
queue_processor_loop():轮询队列是否有工作 + agent 是否空闲,两者同时满足时获取 agent_lock 并唤醒 agent。
| from datetime import datetime |
datetime.datetime:用于获取当前时间 datetime.now() 并提取 minute、hour、day、month、weekday 等字段,供 cron 表达式匹配。S13 无此 import。 |
设计意图
用 Python dataclass 定义结构化数据,自动生成 __init__、__repr__、__eq__ 等方法。与 S12 的 Task 相同模式,便于 asdict() 序列化到 JSON 文件实现持久化。
| @dataclass |
dataclass 装饰器(decorator)。在类定义时自动生成样板代码。需要从 dataclasses 模块导入(S12 已导入,S14 继承)。 |
| class CronJob: |
新类定义。不继承任何父类(隐式继承 object)。 |
| id: str |
字段类型注解,格式如 cron_001234。dataclass 将此注解转为 __init__ 参数。 |
| cron: str # "0 9 * * *" |
5 字段 cron 表达式字符串。注释给出典型示例:每天 9:00 触发。 |
| prompt: str # message to inject when fired |
触发时注入对话的消息文本。agent 收到此消息后开始执行相应任务。 |
| recurring: bool # True = recurring, False = one-shot |
布尔标志。True:每次时间匹配都触发(反复执行)。False:触发一次后自动从 scheduled_jobs 删除。 |
| durable: bool # True = persist to disk |
持久化标志。True:写入 .scheduled_tasks.json,进程重启后恢复。False:仅在内存中,重启丢失。 |
CronJob 实例示例CronJob(
id="cron_042157",
cron="0 9 * * 1-5", # 工作日每天 9:00
prompt="Run daily standup: check task status and update progress",
recurring=True,
durable=True
)
# asdict() 序列化为:
{
"id": "cron_042157",
"cron": "0 9 * * 1-5",
"prompt": "Run daily standup...",
"recurring": true,
"durable": true
}
| DURABLE_PATH = WORKDIR / ".scheduled_tasks.json" |
持久化文件路径常量。Path / str 使用 pathlib 的 / 运算符重载拼接路径。约定用 . 开头(隐藏文件)避免与用户文件混淆。 |
| scheduled_jobs: dict[str, CronJob] = {} |
运行中的 cron 任务注册表。键为 job_id,值为 CronJob 对象。调度线程每秒遍历此 dict。 |
| cron_queue: list[CronJob] = [] |
触发队列。调度线程写入(append),agent_loop 消费(consume_cron_queue 清空)。用 list 而非 queue.Queue 因为 GIL 保护下 list.append 和 list.clear 是原子的(配合 lock 使用更安全)。 |
| cron_lock = threading.Lock() |
保护 scheduled_jobs 和 cron_queue 的互斥锁。调度线程、agent_loop、cron 工具函数都会读写这两个结构,必须加锁。 |
| agent_lock = threading.Lock() |
保护 agent_loop 不被并发调用的互斥锁。queue_processor_loop 使用 acquire(blocking=False) 非阻塞尝试获锁,以判断 agent 是否空闲。 |
| _last_fired: dict[str, str] = {} |
防重复触发 dict。键为 job_id,值为 "YYYY-MM-DD HH:MM" 字符串。若同一任务在同一分钟内调度线程多次轮询到,只触发第一次。日期感知防止跨天重置问题。 |
功能
这是 cron 解析引擎的最小单元:判断一个 cron 字段表达式是否匹配一个整数值。支持四种语法:*(通配)、*/n(步长)、a,b,c(列表)、a-b(范围)、n(精确值)。
| def _cron_field_matches(field: str, value: int) -> bool: |
前缀 _ 表示模块私有函数,不供外部直接调用。field 是 cron 字段字符串,value 是当前时间对应的整数(如 minute=30)。 |
| if field == "*": |
通配符,匹配所有值。最常见情况,优先检查(早期返回优化)。 |
| return True |
* 始终匹配,直接返回。 |
| if field.startswith("*/"): |
str.startswith(prefix):检测步长语法,如 */5(每 5 分钟)、*/2(每 2 小时)。 |
| step = int(field[2:]) |
field[2:]:切片从索引 2 开始到末尾(跳过 "*/")。int():字符串转整数,若非数字会抛 ValueError(此处假设格式已验证)。 |
| return step > 0 and value % step == 0 |
value % step == 0:取模运算(modulo),value 是 step 的整数倍时为真。step > 0:防止除零错误。短路运算(and):左侧为假则不计算右侧。 |
| if "," in field: |
逗号检测,如 1,3,5(周一/三/五)。 |
| return any(_cron_field_matches(f.strip(), value) |
递归调用:将逗号分隔的各子字段逐一检查。f.strip() 去除可能的空格。any() 有一个子字段匹配即为真(OR 语义)。 |
| for f in field.split(",")) |
生成器表达式(括号内),惰性求值。field.split(",") 返回子字段列表。 |
| if "-" in field: |
范围检测,如 1-5(周一到周五)。 |
| lo, hi = field.split("-", 1) |
多重赋值(tuple unpacking)。split("-", 1) 最多分割 1 次(防止负数或复合情况),返回 2 个元素列表直接解包到 lo、hi。 |
| return int(lo) <= value <= int(hi) |
Python 支持链式比较(chained comparison)。等价于 int(lo) <= value and value <= int(hi),更简洁。 |
| return value == int(field) |
精确值匹配(fallback)。如字段 "30" 匹配 value==30。 |
匹配示例_cron_field_matches("*", 30) → True # 通配
_cron_field_matches("*/5", 30) → True # 30 % 5 == 0
_cron_field_matches("*/5", 31) → False # 31 % 5 != 0
_cron_field_matches("1,3,5", 3) → True # 逗号列表,递归匹配 "3"
_cron_field_matches("1-5", 3) → True # 1 <= 3 <= 5
_cron_field_matches("1-5", 6) → False # 6 > 5
_cron_field_matches("30", 30) → True # 精确匹配
功能
将 5 个 cron 字段组合匹配,实现标准 cron 语义:minute、hour、month 必须全匹配;DOM(月中某天)和 DOW(周中某天)若同时约束则用 OR(任一匹配即可)。
| def cron_matches(cron_expr: str, dt: datetime) -> bool: |
dt: datetime 类型注解引用了 datetime 类。在 S14 中 from datetime import datetime,因此可直接用 datetime 而非 datetime.datetime。 |
| fields = cron_expr.strip().split() |
str.strip() 去首尾空白,str.split()(无参数)按任意空白符分割。结果为 list,如 ["0","9","*","*","1-5"]。 |
| if len(fields) != 5: |
快速失败(fail fast):标准 cron 必须是 5 字段,不满足直接返回 False。 |
| return False |
早期返回,避免后续 IndexError。 |
| minute, hour, dom, month, dow = fields |
序列解包(sequence unpacking):将 5 元素列表一次性赋值给 5 个变量。比 fields[0] 等索引访问更可读。 |
| dow_val = (dt.weekday() + 1) % 7 |
星期转换:Python datetime.weekday() 返回 0(Mon)…6(Sun)。标准 cron 约定 0=Sunday。加 1 再模 7:Mon→1, Tue→2, …, Sun→0。 |
| m = _cron_field_matches(minute, dt.minute) |
对 minute 字段调用单字段匹配函数。dt.minute 是 0-59 的整数。 |
| h = _cron_field_matches(hour, dt.hour) |
对 hour 字段匹配,dt.hour 是 0-23。 |
| dom_ok = _cron_field_matches(dom, dt.day) |
day-of-month,dt.day 是 1-31。变量名用 _ok 后缀,与 m/h 区分(DOM/DOW 有特殊 OR 语义)。 |
| month_ok = _cron_field_matches(month, dt.month) |
dt.month 是 1-12。 |
| dow_ok = _cron_field_matches(dow, dow_val) |
使用转换后的 dow_val(0-6,Sunday=0)。 |
| if not (m and h and month_ok): |
分钟、小时、月份必须全部匹配(AND 语义)。括号内是逻辑 AND 表达式。not (...) 若任一不匹配则整体为 True,提前返回 False。 |
| return False |
早期返回——大多数时刻分钟/小时就不匹配,避免继续计算 DOM/DOW。 |
| dom_unconstrained = dom == "*" |
判断 DOM 字段是否是通配符(未约束)。Python 比较运算符返回 bool。 |
| dow_unconstrained = dow == "*" |
同理判断 DOW 字段。 |
| if dom_unconstrained and dow_unconstrained: |
两者都是 * → 日期不约束,直接 True(minute/hour/month 已全匹配)。 |
| return True |
最常见情况(如 0 9 * * *),提前返回。 |
| if dom_unconstrained: |
DOM 未约束但 DOW 有约束 → 只看 DOW。 |
| return dow_ok |
如 0 9 * * 1-5(工作日 9 点):只需 DOW 匹配。 |
| if dow_unconstrained: |
DOW 未约束但 DOM 有约束 → 只看 DOM。 |
| return dom_ok |
如 0 9 1 * *(每月 1 号 9 点):只需 DOM 匹配。 |
| return dom_ok or dow_ok |
OR 语义:DOM 和 DOW 同时约束时,任一匹配即触发。这是标准 cron 的 POSIX 语义,与 AND 相比更直觉(如 0 9 1 * 1 = 每月 1 号 OR 每周一)。 |
典型 cron 表达式示例cron_matches("0 9 * * *", dt(09:00 周三)) → True # 每天 9:00
cron_matches("*/5 * * * *", dt(09:30 任何)) → True # 每 5 分钟
cron_matches("0 9 * * 1-5", dt(09:00 周六)) → False # 周末不触发
cron_matches("0 9 * * 1-5", dt(09:00 周一)) → True # 工作日触发
cron_matches("0 9 1 * 1", dt(09:00 月1日,周三)) → True # DOM=1 匹配,OR
功能
在注册 cron job 前验证表达式合法性,返回错误信息字符串(None 表示合法)。与 cron_matches 解耦,专门处理边界检查。
_validate_cron_field() 关键语法
| def _validate_cron_field(field, lo, hi) -> str | None: |
Union 类型:str | None 是 Python 3.10+ 新语法,等价于旧写法 Optional[str]。None 表示无错误,str 是错误信息。 |
| step_str = field[2:] |
切片提取步长字符串,不含 "*/"。 |
| if not step_str.isdigit(): |
str.isdigit():判断字符串是否全为数字字符(0-9)。注意不接受负数和小数点。 |
| err = _validate_cron_field(part.strip(), lo, hi) |
递归验证:逗号列表中每个子字段独立验证。 |
| if err: return err |
单行 if,发现首个错误立即返回(fail fast)。不收集所有错误。 |
| if a < lo or a > hi or b < lo or b > hi: |
范围值越界检查,使用传入的 lo/hi 边界。 |
| if not field.isdigit(): |
fallback:不是任何已知格式,则验证是否为纯数字。 |
validate_cron() 主函数
| def validate_cron(cron_expr: str) -> str | None: |
公开接口(无下划线前缀)。返回 None 表示合法,返回 str 是错误描述。 |
| bounds = [(0, 59), (0, 23), (1, 31), (1, 12), (0, 6)] |
元组列表:对应 minute、hour、DOM、month、DOW 的合法范围。DOM 从 1 开始(无第 0 天),month 同理。 |
| names = ["minute", "hour", "day-of-month", ...] |
与 bounds 等长的名称列表,用于构造可读错误消息。 |
| for i, (field, (lo, hi), name) in enumerate( |
zip + enumerate + 嵌套解包:zip(fields, bounds, names) 产生三元组,外层 enumerate 加序号(虽然此处未用 i),内层 (lo, hi) 解包 bounds 元素。Python 允许此类多层解包。 |
| err = _validate_cron_field(field, lo, hi) |
对每个字段调用私有验证函数,传入对应范围。 |
| if err: |
err 为 None(无错误)时 falsy,有错误字符串时 truthy。 |
| return f"{name}: {err}" |
在错误前加字段名称,如 "minute: Value 60 out of bounds [0-59]"。 |
| return None |
所有字段验证通过,返回 None 表示合法。 |
验证示例validate_cron("0 9 * * *") → None # 合法
validate_cron("60 9 * * *") → "minute: Value 60 out of bounds [0-59]"
validate_cron("0 9 * * 7") → "day-of-week: Value 7 out of bounds [0-6]"
validate_cron("*/0 9 * * *") → "minute: Step must be > 0: */0"
validate_cron("0 9 1-2") → "Expected 5 fields, got 3"
功能
Cron job 默认持久化到 .scheduled_tasks.json。进程重启后 load_durable_jobs() 恢复所有 durable=True 的任务,保证定时任务跨会话存活。
| def save_durable_jobs(): |
无返回值(隐式 return None)。每次注册/取消 durable 任务后调用,全量覆写文件(非追加)。 |
| durable = [asdict(j) for j in scheduled_jobs.values() if j.durable] |
列表推导:遍历所有 job,过滤 durable=True 的,用 asdict() 转为可序列化 dict。dict.values() 返回值视图。 |
| DURABLE_PATH.write_text(json.dumps(durable, indent=2)) |
json.dumps(obj, indent=2):序列化为带缩进的 JSON 字符串。Path.write_text():原子覆写文件内容(在大多数 OS 上并非真正原子,但足够简单场景使用)。 |
| def load_durable_jobs(): |
程序启动时调用一次,从磁盘恢复持久化任务。 |
| if not DURABLE_PATH.exists(): |
Path.exists():检查文件是否存在。首次运行无文件则直接返回,不报错。 |
| return |
裸 return 等价于 return None,提前退出函数。 |
| try: |
try/except 保护整个加载过程,防止损坏的 JSON 文件导致程序无法启动。 |
| jobs = json.loads(DURABLE_PATH.read_text()) |
json.loads(str):反序列化 JSON 字符串为 Python 对象(list of dict)。 |
| for j in jobs: |
遍历每个 job dict,逐一恢复为 CronJob 对象。 |
| job = CronJob(**j) |
**j:字典解包为关键字参数,等价于 CronJob(id=..., cron=..., ...)。dataclass 自动接受这些参数。 |
| err = validate_cron(job.cron) |
加载时重新验证,防止文件被手动编辑导致无效表达式。 |
| if err: |
有错误则跳过此 job,打印警告但不中止程序。 |
| continue |
跳过当前 for 循环迭代,继续下一个。 |
| scheduled_jobs[job.id] = job |
将合法 job 注册到内存字典。 |
| valid = [j for j in jobs if j["id"] in scheduled_jobs] |
用于统计成功恢复的 job 数量(用原始 dict 而非 CronJob 对象,因为 dict 有 ["id"] 访问)。 |
| except Exception: |
捕获所有异常(含 json.JSONDecodeError、KeyError 等)。裸 except 通常不推荐,但这里是兜底保护:文件损坏不应阻止程序启动。 |
| pass |
静默忽略异常,scheduled_jobs 保持空。 |
schedule_job()
| def schedule_job(...) -> CronJob | str: |
Union 返回类型:成功返回 CronJob 对象,失败返回错误字符串。调用者用 isinstance(result, str) 判断是否出错。 |
| err = validate_cron(cron) |
先验证再注册,确保调度线程不会遇到无效表达式。 |
| if err: |
验证失败则直接返回错误字符串,不创建 CronJob。 |
| return err |
返回 str 类型,与调用者约定的 isinstance 检查对应。 |
| job = CronJob( |
创建 CronJob 实例。dataclass 的 __init__ 接受所有字段。 |
| id=f"cron_{random.randint(0, 999999):06d}", |
random.randint(a, b):含两端的随机整数。:06d:6 位补零格式,避免 ID 碰撞概率约 1/1000000。 |
| with cron_lock: |
写入 scheduled_jobs 时加锁,防止调度线程同时遍历时出现 RuntimeError(dict size changed during iteration)。 |
| if durable: |
只有 durable=True 的 job 才触发文件写入。持久化是可选的,节省 IO。 |
| save_durable_jobs() |
在锁外调用(文件 IO 不应持锁),但 save_durable_jobs 本身不加锁。在单线程写入场景下安全,因为 schedule_job 只被 agent_loop 主线程调用。 |
cancel_job()
| def cancel_job(job_id: str) -> str: |
始终返回 str(成功或失败消息),适合直接作为工具返回值给 LLM。 |
| with cron_lock: |
加锁删除,与调度线程的遍历互斥。 |
| job = scheduled_jobs.pop(job_id, None) |
dict.pop(key, default=None):删除并返回,key 不存在时返回 None 而非抛 KeyError。 |
| if not job: |
None 为 falsy。未找到时返回错误消息。 |
| return f"Job {job_id} not found" |
错误消息直接返回,供 LLM 了解取消失败原因。 |
| if job.durable: |
取消后若曾经持久化,需要同步更新文件(否则重启后幽灵任务会复活)。 |
| save_durable_jobs() |
全量覆写,将已删除的 job 排除在外。 |
| return f"Cancelled {job_id}" |
成功消息。 |
功能
这是 S14 最核心的新函数。它作为 daemon 线程运行,每秒轮询一次,检查所有注册的 cron job 是否匹配当前分钟,匹配则将 job 加入 cron_queue。
| def cron_scheduler_loop(): |
设计为永远运行的无限循环,由 daemon 线程执行。无参数,通过全局变量读取 scheduled_jobs 和写入 cron_queue。 |
| while True: |
无限循环,daemon 线程在主进程退出时自动终止,无需手动停止机制。 |
| time.sleep(1) |
每次循环开头 sleep 1 秒,实现"每秒轮询"。sleep 放在开头(而非结尾)是为了避免程序启动瞬间就触发任务。 |
| now = datetime.now() |
获取本轮循环的当前时间。在循环内部获取以确保每次检查用的是最新时间(而非启动时的快照)。 |
| minute_marker = now.strftime("%Y-%m-%d %H:%M") |
datetime.strftime(format):格式化时间为字符串。包含日期(%Y-%m-%d)是关键:纯 %H:%M 会导致每天同一时刻只触发一次(跨天后 marker 重复)。 |
| with cron_lock: |
持锁遍历 scheduled_jobs,防止并发修改(schedule/cancel 也加锁)。注意:持锁期间 cron_queue.append 也受同一锁保护。 |
| for job in list(scheduled_jobs.values()): |
list(...):将 dict_values 转为列表快照。这样即使循环中 scheduled_jobs 被修改(如非 recurring job 被删除),迭代也不会报错(RuntimeError: dictionary changed size during iteration)。 |
| try: |
每个 job 独立 try/except:一个 job 的错误不影响其他 job 的调度,整个调度线程继续运行。 |
| if cron_matches(job.cron, now): |
调用 cron 匹配函数检查当前时间是否满足此 job 的 cron 表达式。 |
| if _last_fired.get(job.id) != minute_marker: |
防重触发:同一分钟内(约 60 次循环)只触发一次。若 _last_fired 中已有本分钟的 marker,则跳过。 |
| cron_queue.append(job) |
将 CronJob 对象推入触发队列。queue_processor_loop 或 agent_loop 会消费此队列。 |
| _last_fired[job.id] = minute_marker |
记录本 job 的最后触发时间标记,防止本分钟内重复触发。 |
| if not job.recurring: |
one-shot job:触发后立即从 scheduled_jobs 删除。 |
| scheduled_jobs.pop(job.id, None) |
此处在持锁内 pop,安全。使用 list() 快照迭代保证此 pop 不影响当前 for 循环。 |
| if job.durable: |
one-shot + durable:执行后同步删除持久化文件中的记录。 |
| save_durable_jobs() |
注意:此处在 cron_lock 内调用 save_durable_jobs()(文件 IO)。这可能短暂阻塞锁,但 1 秒的轮询间隔足够容忍。 |
| except Exception as e: |
捕获单个 job 的所有异常(含 cron 解析错误、文件 IO 错误等)。 |
| print(f" \033[31m[cron error] {job.id}: {e}\033[0m") |
\033[31m:ANSI 红色。打印错误但不 re-raise,调度线程继续运行。 |
调度线程时序示例(每天 09:00 的 job)08:59:59 sleep(1)
09:00:00 now = datetime(2026-06-19 09:00:00)
minute_marker = "2026-06-19 09:00"
for job cron_042157 ("0 9 * * *"):
cron_matches("0 9 * * *", 09:00) → True
_last_fired.get("cron_042157") = None ≠ "2026-06-19 09:00"
cron_queue.append(job) ← 触发!
_last_fired["cron_042157"] = "2026-06-19 09:00"
09:00:01 sleep(1),再次检查:
_last_fired["cron_042157"] = "2026-06-19 09:00" == minute_marker
→ 跳过(防重触发)
09:01:00 minute_marker = "2026-06-19 09:01"
cron_matches("0 9 * * *", 09:01) → False(minute 1 ≠ 0)→ 跳过
| def consume_cron_queue() -> list[CronJob]: |
供 agent_loop 调用(Layer 4),将 cron_queue 全部取出并清空。返回本次消费到的 CronJob 列表。 |
| with cron_lock: |
加锁保证与调度线程的 append 操作互斥。 |
| fired = list(cron_queue) |
拷贝当前队列内容(不能直接返回原 list,因为后面要 clear)。 |
| cron_queue.clear() |
list.clear():原地清空列表。此操作与 list.append() 都是线程安全的(Python GIL 保护),但 cron_lock 提供更强的原子性保证。 |
| return fired |
返回拷贝的列表。即使返回后调度线程继续 append,不影响已消费的内容。 |
| def has_cron_queue() -> bool: |
供 queue_processor_loop 轮询使用,非破坏性(不消费 queue)。比 consume_cron_queue() 成本低(不拷贝列表)。 |
| with cron_lock: |
加锁读取以获得一致性视图。 |
| return bool(cron_queue) |
bool(list):空列表为 False,非空为 True。 |
S13 vs S14 的差异
S14 在 agent_loop 开头新增 cron 队列消费和注入,并将返回值从 None 改为 context dict(供 queue_processor_loop 使用)。
| def agent_loop(messages: list, context: dict) -> dict: |
返回类型从 S13 的隐式 None 改为 dict(context)。queue_processor_loop 需要更新 session_context。 |
| # Layer 4: consume fired cron jobs → inject as messages |
注释明确标注这是四层架构的 Layer 4(消费者层)。 |
| fired = consume_cron_queue() |
在每次 LLM 调用前先消费队列。若队列为空,fired 是空列表,不注入消息。 |
| for job in fired: |
遍历本轮触发的 cron job,逐一注入对话。 |
| messages.append({"role": "user", |
以 user 角色注入,LLM 将其视为用户发来的新消息,自动触发 agent 响应。 |
| "content": f"[Scheduled] {job.prompt}"}) |
[Scheduled] 前缀使 LLM 知道这是定时触发而非用户手动输入,便于 LLM 按 job.prompt 执行对应任务。 |
| except Exception as e: |
S14 将异常返回从 return(无返回值)改为 return context(有返回值),与新签名一致。 |
| return context |
错误时返回当前 context,让 queue_processor_loop 可以正确更新 session_context。 |
| if response.stop_reason != "tool_use": |
与 S13 相同,LLM 停止使用工具时退出循环。 |
| return context |
S14 新增:正常退出也返回 context(S13 是裸 return)。 |
功能
独立 daemon 线程,轮询检查"有 cron 工作 + agent 空闲"两个条件,同时满足时自动唤醒 agent 处理定时任务,无需用户输入。
| def queue_processor_loop(): |
无限循环,运行在独立 daemon 线程中(主函数启动)。 |
| global session_context |
声明修改全局 session_context(在 run_agent_turn_locked 内更新)。 |
| while True: |
无限轮询。 |
| time.sleep(0.2) |
每 200ms 检查一次队列(比调度线程的 1s 更频繁),实现近实时响应。sleep 放循环开头避免 busy-wait 消耗 CPU。 |
| if not has_cron_queue(): |
快速路径:无工作则跳过,不尝试获锁。大多数时刻走此路径。 |
| continue |
跳回 while 开头,再次 sleep。 |
| if not agent_lock.acquire(blocking=False): |
非阻塞获锁:blocking=False 使 acquire 立即返回 True(获锁成功)或 False(锁被持有)。若 agent 正忙(用户在交互),不等待,下次循环再试。 |
| continue |
agent 忙则跳过,200ms 后再试。 |
| try: |
try/finally 确保即使 run_agent_turn_locked 内部抛异常,agent_lock 也会被释放。 |
| if not has_cron_queue(): |
二次检查(double-check):获锁后再次确认队列非空。防止在"检查到获锁"这段时间窗口内,另一个线程(或用户输入触发的 agent_loop)已经消费了队列。 |
| continue |
二次检查发现队列为空,直接 continue(finally 会释锁)。 |
| print("\n \033[35m[queue processor] delivering scheduled work\033[0m") |
\033[35m:ANSI 紫色。开头 \n 是为了不与用户正在输入的提示符混在同一行。 |
| run_agent_turn_locked() |
无参数调用(无用户输入),agent_loop 将消费 cron_queue 中的任务并执行。 |
| finally: |
无论 try 块是正常完成还是异常,都执行 finally 块。 |
| agent_lock.release() |
释放 agent_lock,允许用户交互或下一轮 queue_processor 运行。 |
queue_processor_loop 决策树每 200ms:
has_cron_queue()?
├── No → continue(快速路径,99.9% 的时刻)
└── Yes → agent_lock.acquire(blocking=False)?
├── False (agent 忙) → continue(等 200ms 再试)
└── True (agent 空闲)
→ has_cron_queue()?(二次检查)
├── No → continue(release lock via finally)
└── Yes → run_agent_turn_locked()
→ agent_loop 消费 cron_queue
→ release agent_lock
功能
S14 将 main 函数的 history/context 局部变量提升为模块级全局变量,使 queue_processor_loop 可以访问并修改同一会话状态。run_agent_turn_locked 封装了一次 agent 调用的完整流程。
| session_history: list = [] |
模块级全局变量(S13 在 __main__ 中是局部变量 history)。升级为全局使 queue_processor_loop 可以向此历史添加消息。 |
| session_context = update_context({}, []) |
模块加载时初始化 context。queue_processor_loop 通过 global session_context 修改。 |
| def print_latest_assistant_text(messages: list): |
从 messages 最后一条 assistant 消息中提取并打印文本内容。独立函数供 run_agent_turn_locked 和 main loop 复用。 |
| if isinstance(content, str): |
content 可能是 str(简单文本消息)或 list(含多个 block),需要两种处理路径。 |
| if getattr(block, "type", None) == "text": |
getattr(obj, name, default):安全属性访问,对象无此属性时返回 default 而不抛 AttributeError。block 可能是 SDK 对象(有 .type 属性)或 dict(用 block.get)。 |
| def run_agent_turn_locked(user_query: str | None = None): |
参数可选:有用户输入时添加 user 消息,无输入时(queue_processor 调用)直接执行(消费 cron_queue)。调用者必须持有 agent_lock。 |
| global session_context |
声明修改全局 session_context。 |
| if user_query is not None: |
用 is not None 而非 if user_query,区分"空字符串输入"和"无输入"两种情况(虽然实际上已在 main 过滤了空字符串)。 |
| session_history.append({"role": "user", "content": user_query}) |
只有有用户输入时才添加 user 消息,无输入时直接让 agent_loop 消费 cron_queue。 |
| session_context = agent_loop(session_history, session_context) |
接收 agent_loop 返回的 context(S14 新增返回值),更新 session_context。 |
| session_context = update_context(session_context, session_history) |
agent_loop 结束后再次更新 context(如加载最新 memories)。 |
四层架构全流程示例# 09:00:00 定时触发"Run daily standup"
Layer 1 (scheduler thread, 09:00:00):
cron_matches("0 9 * * *") → True
cron_queue.append(standup_job)
Layer 2 (cron_queue):
[standup_job] ← 等待消费
Layer 3 (queue_processor_loop, 09:00:00.2):
has_cron_queue() → True
agent_lock.acquire(blocking=False) → True(用户未在输入)
run_agent_turn_locked() ← 无 user_query 参数
Layer 4 (agent_loop via run_agent_turn_locked):
fired = consume_cron_queue() → [standup_job]
messages.append({"role":"user", "content":"[Scheduled] Run daily standup..."})
→ LLM 收到此消息,开始执行 standup 任务
→ 工具调用:list_tasks, complete_task, ...
→ LLM 生成摘要消息
print_latest_assistant_text()
agent_lock.release() ← via finally