引言
GraphRAG(Knowledge Graph RAG)的发布在技术界引起了广泛关注,它通过将非结构化文本转化为结构化的知识图谱,显著增强了大模型对复杂查询的理解能力。Neo4j 作为全球领先的图数据库,为存储和查询这些图谱提供了强大的支持。本文将详细介绍如何将 GraphRAG 生成的索引结果导入 Neo4j,实现数据的持久化存储与可视化展示。
环境准备
本教程依赖两部分核心环境:GraphRAG 运行环境(假设已生成输出)和 Neo4j 图数据库。重点在于 Neo4j 的快速部署。为了简化配置,我们使用 Docker 容器化安装,并启用 APOC 插件以增强功能。
1. 启动 Neo4j 容器
执行以下命令一键搭建 Neo4j 运行环境。该命令映射了 HTTP 端口 7474 和 Bolt 端口 7687,并启用了必要的 APOC 配置。
docker run \
-p 7474:7474 -p 7687:7687 \
--name neo4j-apoc \
-e NEO4J_apoc_export_file_enabled=true \
-e NEO4J_apoc_import_file_enabled=true \
-e NEO4J_apoc_import_file_use__neo4j__config=true \
-e NEO4J_PLUGINS=["apoc"] \
neo4j:5.21.2
2. 验证安装
启动成功后,日志中应显示服务就绪信息。访问 http://localhost:7474,默认账号为 neo4j,初始密码也为 neo4j。首次登录系统会强制要求修改密码,设置完成后即可进入管理界面。
Python 环境配置
在开始数据导入前,需要确保 Python 环境中安装了必要的依赖库。主要涉及 Pandas 用于数据处理,以及 Neo4j 官方驱动库。
pip3 install --quiet pandas neo4j-rust-ext
连接与 Schema 设计
1. 建立数据库连接
使用 neo4j 驱动创建会话对象。需替换为你的实际 URI、用户名和密码。
import pandas as pd
from neo4j import GraphDatabase
import time
NEO4J_URI = "neo4j://localhost"
NEO4J_USERNAME = "neo4j"
NEO4J_PASSWORD = "your_password_here"
NEO4J_DATABASE = "neo4j"
driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USERNAME, NEO4J_PASSWORD))
2. 定义唯一约束
为了保证数据完整性,避免重复节点或关系,我们需要在导入前创建唯一性约束。这包括文档 ID、Chunk ID、实体 ID 等。
statements = [
"create constraint chunk_id if not exists for (c:__Chunk__) require c.id is unique;",
"create constraint document_id if not exists for (d:__Document__) require d.id is unique;",
"create constraint entity_id if not exists for (c:__Community__) require c.community is unique;",
"create constraint entity_id if not exists for (e:__Entity__) require e.id is unique;",
"create constraint entity_title if not exists for (e:__Entity__) require e.name is unique;",
"create constraint entity_title if not exists for (e:__Covariate__) require e.title is unique;",
"create constraint related_id if not exists for ()-[rel:RELATED]->() require rel.id is unique;"
]
for statement in statements:
if len((statement or "").strip()) > 0:
print(statement)
driver.execute_query(statement)
批量数据导入流程
为了提高效率,我们采用分批导入策略。每次处理一定数量的行,避免内存溢出。
1. 批处理函数封装
def batched_import(statement, df, batch_size=1000):
"""
使用批处理方式将 DataFrame 导入 Neo4j。
:param statement: Cypher 查询语句
:param df: 待导入的 Pandas DataFrame
:param batch_size: 每批次处理的行数
"""
total = len(df)
start_s = time.time()
for start in range(0, total, batch_size):
batch = df.iloc[start: min(start + batch_size, total)]
result = driver.execute_query(
"UNWIND $rows AS value " + statement,
rows=batch.to_dict('records'),
database_=NEO4J_DATABASE
)
print(result.summary.counters)
print(f'{total} rows in {time.time() - start_s} s.')
return total
2. 导入文档与文本单元
首先导入基础文档信息和分片(Text Units),建立文档与内容的关联。
GRAPHRAG_FOLDER = "./output/artifacts"
doc_df = pd.read_parquet(f'{GRAPHRAG_FOLDER}/create_final_documents.parquet', columns=["id", "title"])
statement_doc = """
MERGE (d:__Document__ {id:value.id})
SET d += value {.title}
"""
batched_import(statement_doc, doc_df)
text_df = pd.read_parquet(f'{GRAPHRAG_FOLDER}/create_final_text_units.parquet',
columns=["id","text","n_tokens","document_ids"])
statement_chunk = """
MERGE (c:__Chunk__ {id:value.id})
SET c += value {.text, .n_tokens}
WITH c, value
UNWIND value.document_ids AS document
MATCH (d:__Document__ {id:document})
MERGE (c)-[:PART_OF]->(d)
"""
batched_import(statement_chunk, text_df)
3. 导入实体与关系
实体是图谱的核心节点,关系则定义了它们之间的交互。导入时需处理标签动态添加和向量属性。
entity_df = pd.read_parquet(f'{GRAPHRAG_FOLDER}/create_final_entities.parquet',
columns=["name", "type", "description", "human_readable_id", "id", "description_embedding", "text_unit_ids"])
entity_statement = """
MERGE (e:__Entity__ {id:value.id})
SET e += value {.human_readable_id, "description": replace(value.description,'"','')}
WITH e, value
CALL db.create.setNodeVectorProperty(e, "description_embedding", value.description_embedding)
CALL apoc.create.addLabels(e, case when coalesce(value.type,"") = "" then [] else [apoc.text.upperCamelCase(replace(value.type,'"',''))] end) yield node
UNWIND value.text_unit_ids AS text_unit
MATCH (c:__Chunk__ {id:text_unit})
MERGE (c)-[:HAS_ENTITY]->(e)
"""
batched_import(entity_statement, entity_df)
rel_df = pd.read_parquet(f'{GRAPHRAG_FOLDER}/create_final_relationships.parquet',
columns=["source", "target", "id", "rank", "weight", "human_readable_id", "description", "text_unit_ids"])
rel_statement = """
MATCH (source:__Entity__ {name:replace(value.source,'"','')})
MATCH (target:__Entity__ {name:replace(value.target,'"','')})
MERGE (source)-[rel:RELATED {id: value.id}]->(target)
SET rel += value {.rank, .weight, .human_readable_id, .description, .text_unit_ids}
RETURN count(*) as createdRels
"""
batched_import(rel_statement, rel_df)
4. 导入社区与报告
GraphRAG 的层级结构通过 Community 节点体现,包含社区报告摘要和发现点。
community_df = pd.read_parquet(f'{GRAPHRAG_FOLDER}/create_final_communities.parquet',
columns=["id", "level", "title", "text_unit_ids", "relationship_ids"])
statement_community = """
MERGE (c:__Community__ {community:value.id})
SET c += value {.level, .title}
UNWIND value.relationship_ids as rel_id
MATCH (start:__Entity__)-[:RELATED {id:rel_id}]->(end:__Entity__)
MERGE (start)-[:IN_COMMUNITY]->(c)
MERGE (end)-[:IN_COMMUNITY]->(c)
RETURN count(distinct c) as createdCommunities
"""
batched_import(statement_community, community_df)
community_report_df = pd.read_parquet(f'{GRAPHRAG_FOLDER}/create_final_community_reports.parquet',
columns=["id", "community", "level", "title", "summary", "findings", "rank", "rank_explanation", "full_content"])
community_statement = """
MATCH (c:__Community__ {community: value.community})
SET c += value {.level, .title, .rank, .rank_explanation, .full_content, .summary}
WITH c, value
UNWIND range(0, size(value.findings)-1) AS finding_idx
WITH c, value, finding_idx, value.findings[finding_idx] as finding
MERGE (c)-[:HAS_FINDING]->(f:Finding {id: finding_idx})
SET f += finding
"""
batched_import(community_statement, community_report_df)
结果展示与分析
完成上述步骤后,数据已成功存入 Neo4j。我们可以通过 Neo4j Browser 进行可视化探索。
1. 图谱概览
不同的节点颜色代表不同的类型(如 Entity, Document, Community)。通过缩放和平移,可以观察整体网络拓扑结构。例如,在《斗破苍穹》世界观案例中,可以看到角色、地点、事件之间复杂的关联网络。
2. 常用查询示例
- 查找特定实体的邻居:
MATCH (e:__Entity__ {name:"萧炎"})-[*1..2]-(neighbor)
RETURN e, neighbor
- 查看社区层级:
MATCH (c:__Community__)-[:IN_COMMUNITY]->(e:__Entity__)
RETURN c.level, count(e) AS entity_count
ORDER BY c.level DESC
常见问题排查
- 连接拒绝:检查 Docker 端口是否被占用,确认
neo4j://localhost 地址正确。
- 权限错误:确保使用了正确的用户名和密码,且密码未过期。
- 导入缓慢:增加
batch_size 参数或优化硬件资源;检查磁盘 I/O 性能。
- 约束冲突:如果数据已存在且 ID 重复,需先清理旧数据或调整导入逻辑。
总结
本文详细演示了从 GraphRAG 导出结果到 Neo4j 入库的全流程。通过构建规范的数据模型和高效的导入脚本,我们可以将非结构化文本转化为可查询的知识图谱。这不仅有助于理解 GraphRAG 的内部机制,也为后续的大模型检索增强应用提供了坚实的结构化数据基础。在实际生产中,建议结合定期增量更新策略,保持图谱数据的时效性。