s04
GraphRAG 构建
知识层Zep Cloud 知识图谱动态构建
动态生成 Pydantic 模型设置本体,批量灌入文本构建图谱从 JSON Schema 到 Pydantic 类的动态代码生成,让图谱结构在运行时由 LLM 决定而非硬编码
Zep Cloud 知识图谱动态构建
动态生成 Pydantic 模型设置本体,批量灌入文本构建图谱从 JSON Schema 到 Pydantic 类的动态代码生成,让图谱结构在运行时由 LLM 决定而非硬编码
本体 Schema 由 LLM 在运行时生成,实体类型和关系类型每次都不同。GraphRAG 构建模块需要将这些动态 Schema 转化为 Zep 能理解的 Pydantic 模型,然后批量灌入文本并等待异步处理完成。这是整个流水线中与外部服务交互最密集的环节。
核心创新:从 JSON 动态创建 Pydantic 类,让图谱结构在运行时由 LLM 决定
def set_ontology(self, graph_id, ontology):
from zep_cloud.external_clients.ontology import EntityModel, EntityText
RESERVED_NAMES = {'uuid', 'name', 'group_id', 'created_at', 'summary'}
entity_types = {}
for entity_def in ontology.get("entity_types", []):
name = entity_def["name"]
attrs = {"__doc__": entity_def.get("description", "")}
annotations = {}
for attr_def in entity_def.get("attributes", []):
attr_name = safe_attr_name(attr_def["name"]) # 避开保留字
attrs[attr_name] = Field(description=attr_def.get("description"), default=None)
annotations[attr_name] = Optional[EntityText]
attrs["__annotations__"] = annotations
# 动态创建 Pydantic 子类
entity_class = type(name, (EntityModel,), attrs)
entity_types[name] = entity_class
self.client.graph.set_ontology(
graph_ids=[graph_id], entities=entity_types)批量灌入 + 轮询等待:发送文本块并等待 Zep 异步处理完成
def add_text_batches(self, graph_id, chunks, batch_size=3, progress_callback=None):
episode_uuids = []
for i in range(0, len(chunks), batch_size):
batch = chunks[i:i + batch_size]
episodes = [EpisodeData(data=chunk, type="text") for chunk in batch]
result = self.client.graph.add_batch(graph_id=graph_id, episodes=episodes)
for ep in result:
ep_uuid = getattr(ep, 'uuid_', None) or getattr(ep, 'uuid', None)
if ep_uuid:
episode_uuids.append(ep_uuid)
time.sleep(1) # 避免请求过快
return episode_uuids
def _wait_for_episodes(self, episode_uuids, timeout=600):
pending = set(episode_uuids)
while pending:
for ep_uuid in list(pending):
episode = self.client.graph.episode.get(uuid_=ep_uuid)
if getattr(episode, 'processed', False):
pending.remove(ep_uuid)
if pending:
time.sleep(3) # 每3秒检查一次