S09: Memory System — 代码解析
相对于 S08 新增:持久化跨会话记忆 — 文件存储 + 索引 + 自动提取 + 相关性选取 + 合并整理
1. 整体功能概览
01
解决什么问题
S08 的对话记忆只在单次进程中存在。S09 引入 .memory/ 目录,将用户偏好、项目事实、反馈以 Markdown + YAML frontmatter 形式持久写入磁盘,下次启动依然可用。
02
三级记忆流水线
索引(MEMORY.md,每轮注入 system)→ 选取(LLM 语义匹配,找出相关文件)→ 内容注入(在当前 user 消息前插入)。
03
自动生命周期
每轮结束后自动 extract_memories(从对话中提炼新记忆),累计超过 10 条时触发 consolidate_memories(LLM 去重合并)。
记忆类型:user(用户偏好)、feedback(反馈指导)、project(项目事实)、reference(外部指针)
文件结构示意
.memory/
MEMORY.md # 索引文件 — 每行一条记忆指针
user-preference-tabs.md # 单个记忆文件(YAML frontmatter + Markdown body)
project-facts.md
feedback-code-style.md
# MEMORY.md 示例内容:
- [user-preference-tabs](user-preference-tabs.md) — 用户偏好使用 tabs 缩进
2. 常量与目录初始化
S09 在 S08 的 WORKDIR / SKILLS_DIR / TRANSCRIPT_DIR 基础上新增了 MEMORY_DIR 和 MEMORY_INDEX:
| WORKDIR = Path.cwd() |
工作目录锚点;所有相对路径的基准 |
| MEMORY_DIR = WORKDIR / ".memory" |
pathlib / 运算符重载 → 路径拼接;等价于 os.path.join(WORKDIR, ".memory") |
| MEMORY_DIR.mkdir(exist_ok=True) |
Path.mkdir();exist_ok=True 表示已存在时不抛 FileExistsError |
| MEMORY_INDEX = MEMORY_DIR / "MEMORY.md" |
索引文件路径对象;直接用 Path 读写,无需 open() |
| MEMORY_TYPES = ["user", "feedback",
"project", "reference"] |
记忆类型枚举列表;写入 frontmatter 的 type 字段只能取这四个值 |
3. YAML Frontmatter 解析
_parse_frontmatter() 从记忆文件中提取元数据(name、description、type),返回 (meta_dict, body_str) 二元组:
| def _parse_frontmatter(text: str)
-> tuple[dict, str]: |
Python 3.9+ 内置泛型语法 tuple[dict, str];无需 from typing import Tuple |
| if not text.startswith("---"): |
frontmatter 规范:文件必须以 --- 开头;否则视为无元数据 |
| return {}, text |
早期返回:meta 为空 dict,body 为全文 |
| parts = text.split("---", 2) |
split(sep, maxsplit=2) 最多分成 3 段:["", frontmatter, body] |
| if len(parts) < 3: |
缺少结束 --- 时退化处理,避免 IndexError |
| for line in parts[1].strip().splitlines(): |
parts[1] 是 frontmatter 区域;splitlines() 按行迭代 |
| if ":" in line:
k, v = line.split(":", 1) |
split(":", 1) 最多分成 2 段,防止 value 中含冒号被截断 |
| meta[k.strip()] = v.strip()
.strip('"').strip("'") |
链式 strip():先去首尾空白,再去引号(YAML 允许带引号的字符串) |
| return meta, parts[2].strip() |
parts[2] 是正文;strip() 去掉 frontmatter 后的空行 |
4. 记忆文件读写 + 索引重建
write_memory_file()
| def write_memory_file(name, mem_type,
description, body): |
写入单个记忆文件并自动重建索引 |
| slug = name.lower()
.replace(" ", "-")
.replace("/", "-") |
将 name 转为合法文件名(kebab-case);链式 replace 处理空格和斜杠 |
| filename = f"{slug}.md" |
f-string 格式化;记忆文件统一用 .md 后缀 |
| filepath = MEMORY_DIR / filename |
Path 对象拼接,得到完整绝对路径 |
| filepath.write_text(
f"---\nname: {name}\n..."
) |
Path.write_text() 一次性写入;自动以 UTF-8 编码(Python 3.10+ 默认) |
| _rebuild_index() |
每次写文件后立即重建 MEMORY.md,保持索引与磁盘同步 |
_rebuild_index()
| for f in sorted(MEMORY_DIR.glob("*.md")): |
Path.glob() 返回生成器;sorted() 保证索引顺序稳定 |
| if f.name == "MEMORY.md":
continue |
跳过索引文件本身,避免自引用 |
| meta, body = _parse_frontmatter(
f.read_text()) |
解包二元组;f.read_text() 无需 open/close |
| name = meta.get("name", f.stem) |
dict.get(key, default);f.stem 是去掉后缀的文件名(Path 属性) |
| desc = meta.get("description",
body.split("\n")[0][:80]) |
fallback:取正文第一行前 80 字符;[:80] 切片防止超长 |
| lines.append(
f"- [{name}]({f.name}) — {desc}") |
Markdown 链接语法 [text](href);每行一条记忆 |
| MEMORY_INDEX.write_text(
"\n".join(lines) + "\n" if lines else "") |
三元表达式;空列表时写空字符串而非一个孤立换行 |
read_memory_index() / read_memory_file() / list_memory_files()
| def read_memory_index() -> str: |
读取索引内容;每轮注入 SYSTEM prompt;返回空字符串而非 None,调用方无需判空 |
| if not MEMORY_INDEX.exists():
return "" |
首次启动时索引文件尚不存在;Path.exists() 防 FileNotFoundError |
| def read_memory_file(filename: str)
-> str | None: |
Python 3.10+ Union 简写 str | None;找不到文件返回 None |
| def list_memory_files() -> list[dict]: |
返回所有记忆的元数据列表,供 select/extract/consolidate 使用 |
| result.append({
"filename": f.name,
"name": ..., "description": ...,
"type": ..., "body": body,
}) |
统一数据结构;body 字段包含 Markdown 正文,consolidate 时需要全文 |
5. 相关记忆选取 (select_relevant_memories)
核心逻辑:用 LLM 对比「最近对话文本」与「记忆目录摘要」,返回相关文件名列表;LLM 失败则退化为关键词匹配。
| def select_relevant_memories(
messages: list, max_items: int = 5)
-> list[str]: |
参数默认值 max_items=5;返回文件名字符串列表 |
| files = list_memory_files()
if not files:
return [] |
早期返回;无记忆时跳过 LLM 调用,节省 token |
| for msg in reversed(messages):
if msg.get("role") == "user": |
reversed() 返回反转迭代器,从最新消息向前扫描 |
| content = msg.get("content", "")
if isinstance(content, list): |
content 可能是字符串(普通轮次)或 list(含 tool_result 的轮次) |
| content = " ".join(
str(getattr(b,"text",""))
for b in content
if getattr(b,"type",None)
== "text") |
生成器表达式;getattr(b,"text","") 兼容 SDK 对象和 dict 两种 block 形式 |
| recent = " ".join(
reversed(recent_texts))[:2000] |
恢复时间顺序后截断 2000 字符;防止 prompt 过大 |
| catalog_lines = []
for i, f in enumerate(files):
catalog_lines.append(
f"{i}: {f['name']} — {f['description']}") |
enumerate() 给每个记忆编号;LLM 只需返回序号数组,不用输出完整文件名 |
| prompt = (
"Return ONLY a JSON array of integers...") |
结构化输出约束:要求 LLM 只返回 JSON,减少解析失败概率 |
| max_tokens=200, |
只需要一个 JSON 数组,200 token 充裕;限制避免不必要的消耗 |
| match = re.search(r'\[.*?\]',
text, re.DOTALL) |
正则提取 JSON 数组;re.DOTALL 让 . 匹配换行符;.*? 非贪婪 |
| indices = json.loads(match.group()) |
将 JSON 字符串反序列化为 Python list |
| if isinstance(idx, int)
and 0 <= idx < len(files): |
边界检查:防止 LLM 返回越界序号导致 IndexError |
| except Exception:
pass |
宽泛捕获:LLM 调用或 JSON 解析任何失败都静默降级到关键词匹配 |
| keywords = [w.lower() for w in recent.split()
if len(w) > 3] |
Fallback:列表推导式提取 4 字符以上单词作为关键词,过滤停用词 |
| text = (f["name"] + " "
+ f["description"]).lower()
if any(kw in text for kw in keywords): |
any() 短路求值;只要有一个关键词命中就选中该记忆 |
load_memories() — 内容组装
| def load_memories(messages: list) -> str: |
调用 select_relevant_memories 后,读取全文并包裹 XML 标签返回 |
| parts = ["<relevant_memories>"]
for filename in selected_files:
content = read_memory_file(filename)
if content:
parts.append(content)
parts.append("</relevant_memories>") |
XML 标签让 LLM 清晰区分「记忆注入内容」和「用户真实消息」;list 追加后一次 join |
| return "\n\n".join(parts) |
段落间空两行,便于 LLM 解析不同记忆块 |
7. 记忆合并整理 (consolidate_memories)
当 .memory/ 目录下文件数 ≥ CONSOLIDATE_THRESHOLD(默认 10)时触发,用 LLM 去重合并:
| CONSOLIDATE_THRESHOLD = 10 |
模块级常量;超过此数量触发 Dream(合并)操作 |
| def consolidate_memories(): |
每轮结束后由 agent_loop 调用(在 extract_memories 之后) |
| files = list_memory_files()
if len(files) < CONSOLIDATE_THRESHOLD:
return |
快速退出:低于阈值时不触发,避免频繁 LLM 调用 |
| catalog = "\n\n".join(
f"## {f['filename']}\n..."
for f in files) |
生成器表达式 + join;将所有记忆拼成一个大文档传给 LLM |
| prompt = (
"Keep the total under 30 memories\n"
"Return a JSON array...") |
明确上限 30 条;LLM 负责去重、合并矛盾、删除过时内容 |
| max_tokens=3000 |
合并可能产生较多输出(多条记忆全文),需要更大 token 预算 |
| for f in MEMORY_DIR.glob("*.md"):
if f.name != "MEMORY.md":
f.unlink() |
Path.unlink() 删除文件;先删旧文件,再写入 LLM 返回的合并结果 |
| print(f"\n\033[33m[Memory: consolidated "
f"{len(files)} → {len(items)} memories]\033[0m") |
打印合并前后数量对比;→ Unicode 箭头直接用于 f-string |
8. build_system() — 动态 SYSTEM Prompt
S08 中 SYSTEM 是模块加载时一次性生成的常量。S09 改为函数,每轮 agent_loop 调用,使 SYSTEM 始终包含最新记忆索引:
S08 写法(静态)
# 模块顶层执行一次
SYSTEM = build_system() # ← 常量
def agent_loop(messages):
response = client.messages.create(
system=SYSTEM, # ← 每轮相同
...
)
S09 写法(动态)
# agent_loop 每轮调用
def agent_loop(messages):
system = build_system() # ← 每轮重建
response = client.messages.create(
system=system,
...
)
| def build_system() -> str: |
S09 新增:函数而非常量,每轮 agent_loop 重新调用 |
| index = read_memory_index() |
读取 MEMORY.md 索引内容;空时返回 "" |
| memories_section = (
f"\n\nMemories available:\n{index}"
if index else "") |
三元表达式:有记忆才追加索引段落,无记忆不增加噪声 |
| return (
f"You are a coding agent at {WORKDIR}."
f"{memories_section}\n"
"Relevant memories are injected below. "
"Respect user preferences from memory.\n"
"When the user says 'remember'...") |
括号内隐式字符串拼接;三段 f-string/string 合并为一条 return;告知 LLM 何时触发记忆提取 |
9. Agent Loop 集成
S09 在 S08 的压缩流水线基础上,增加三个记忆相关步骤:
# S09 agent_loop 新增流程(伪代码)
def agent_loop(messages):
# 1. 每轮动态重建 system(含最新记忆索引)
system = build_system()
# 2. 选取并加载相关记忆内容
memories_content = load_memories(messages)
while True:
# 3. 保存压缩前快照(用于准确提取记忆)
pre_compress = [...]
# 4. S08 压缩流水线(不变)
messages[:] = tool_result_budget(messages)
messages[:] = snip_compact(messages)
messages[:] = micro_compact(messages)
# 5. 将记忆内容注入当前 user 消息头部
request_messages[memory_turn] = {
**messages[memory_turn],
"content": memories_content + "\n\n"
+ messages[memory_turn]["content"],
}
# 6. LLM 调用
response = client.messages.create(...)
if response.stop_reason != "tool_use":
# 7. 从压缩前快照提取新记忆
extract_memories(pre_compress)
# 8. 超过阈值则合并
consolidate_memories()
return
关键代码逐行注解
| memories_content = load_memories(messages) |
在进入 while True 之前一次性加载(本轮对话开始时确定相关记忆) |
| memory_turn = (len(messages) - 1
if messages and isinstance(
messages[-1].get("content"), str)
else None) |
记录当前 user 消息的索引;content 必须是 str(不是 list)才能直接拼接记忆 |
| system = build_system() |
每次 agent_loop 调用时重建,确保 SYSTEM 包含最新 MEMORY.md 索引 |
| pre_compress = [m if isinstance(m, dict)
else {"role": m.get("role",""),
"content": str(m.get("content",""))}
for m in messages] |
列表推导式创建快照;SDK 对象转为 dict;压缩后 messages 会改变,快照保留原始内容 |
| if memories_content
and memory_turn is not None
and memory_turn < len(messages): |
三重条件:有记忆内容 + 记录了目标索引 + 索引未超出边界(压缩可能缩短列表) |
| request_messages = messages.copy()
request_messages[memory_turn] = {
**messages[memory_turn],
"content": memories_content + "\n\n"
+ messages[memory_turn]["content"],
} |
messages.copy() 浅拷贝:避免修改原始 messages;** 解包 dict 保留其他键;记忆内容前置 |
| if response.stop_reason != "tool_use":
extract_memories(pre_compress)
consolidate_memories()
return |
只在轮次真正结束时提取记忆(tool_use 中间轮次跳过),传入 pre_compress 保证完整性 |
为什么用 pre_compress 快照?
压缩(snip_compact / micro_compact)会清除中间轮次和 tool_result 内容。若从压缩后的 messages 提取记忆,关键对话细节已丢失。pre_compress 在压缩前拍摄,保留完整信息。