跳到主要内容 使用大语言模型从零构建知识图谱 | 极客日志
Python AI 算法
使用大语言模型从零构建知识图谱 本文介绍了利用大语言模型从零开始构建知识图谱的完整流程。首先配置本地开源模型如 Ollama,提取数据集结构定义节点属性。接着通过提示词工程引导模型生成节点定义与关系三元组,并引入重试机制确保稳定性。随后构建 Cypher 查询模板,验证语法后加载数据至 Neo4j 图数据库。该方法展示了如何自动化处理非结构化业务数据,为设计自定义 GraphRAG 系统提供基础方案。
小熊软糖 发布于 2025/2/6 更新于 2026/4/20 1 浏览通过创建一个自定义流程来自动上传业务数据
在这一节,我会带你创建一个自定义流程,通过大语言模型自动生成节点定义、关系和 Cypher 查询,基于数据集进行操作。这种方法也适用于其他 DataFrame,同时该方法也能够自动识别其 Schema。
需要注意的是,这种方法在性能上会是个问题,尤其是与 Langchain 的 LLMGraphTransformer 相比,我将在下一节中进行介绍。而本节主要帮助你理解如果从零开始构建该过程,从原理出发,帮助你有机会设计自己的 Graph-Builder。实际上,目前所谓最佳方法的主要限制来自于它对数据的天然含义和模式高度敏感。因此,需要跳出固有的思维模式就显得至关重要,这样才能够帮助你从零开始设计 GraphRAG,或利用现有的,最佳实践的 GraphRAG 来满足你的业务需求。
现在,让我们深入研究,设置我们将在接下来的练习中使用的大语言模型。你可以使用 Langchain 所支持的任何大语言模型,只要其性能能够满足你真实(而非真是)的业务需要。
这里我们有两个可选的免费方案:DeepSeek-V3(注册后可获得 10 元的额度,有效期一个月)和 Ollama(可以让你轻松的在本地运行开源模型)。对于这两种方案我都进行了测试,尽管 DeepSeek-V3 提供了和 GPT-4o 类似的性能,我仍然推荐你选择 Ollama 进行学习,这样,你可以更深入的了解从模型下载到运行的整个过程。
在 Ollama 示例中,我们将使用 Qwen2.5-Coder:7B,它针对代码任务进行了微调,并在代码生成、推理和修复代码错误方面表现出色。根据你本地计算机的配置来决定是否使用更高参数量的版本,如 14B 或 32B。
让我们从初始化模型开始:
llm = OllamaLLM(model="qwen2.5-coder:latest" )
让我们开始提取数据集的结构,并定义节点及其属性:
node_structure = "\n" .join([f"{col} : {', ' .join(map (str , movies[col].unique()[:3 ]))} ..." for col in movies.columns])
print (node_structure)
对于数据集中的每一列(例如:电影类型、导演),我们来展示一些样本值。这将帮助大语言模型理解数据格式以及每一列的典型值。
生成节点
接下来,我们使用大语言模型的提示词模板来引导模型如何提取节点及其属性。让我们先看看完整的代码:
import logging
logging.basicConfig(level=logging.logging.INFO)
logger = logging.getLogger(__name__)
def validate_node_definition (node_def: Dict ) -> bool :
"""验证节点结构定义"""
if not isinstance (node_def, dict ):
return False
return (
(v, ) ( (k, ) k v.keys())
v node_def.values()
)
retrying retry
time sleep
( ) -> [ , [ , ]]:
:
response = chain.invoke({ : structure, : example})
node_defs = ast.literal_eval(response)
validate_node_definition(node_defs):
ValueError( )
node_defs
(ValueError, SyntaxError) e:
logger.error( )
node_example = {
: { : , : },
: { : , : },
: { : , : },
}
:
node_chain = define_nodes_prompt | llm
node_definitions = get_node_definitions(node_chain, structure=node_structure, example=node_example)
logger.info( )
Exception e:
logger.error( )
相关免费在线工具 加密/解密文本 使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
RSA密钥对生成器 生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
Mermaid 预览与可视化编辑 基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
curl 转代码 解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
Base64 字符串编码/解码 将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
Base64 文件转换器 将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
all
isinstance
dict
and
all
isinstance
str
for
in
for
in
from
import
from
import
@retry(stop=stop_after_attempt(3 ), wait=wait_exponential(multiplier=1 , min =4 , max =10 ) )
def
get_node_definitions
chain, structure: str , example: Dict
Dict
str
Dict
str
str
"""使用重试逻辑来获取节点定义"""
try
"structure"
"example"
if
not
raise
"无效的节点结构定"
return
except
as
f"解析节点定义时出错:{e} "
raise
"NodeLabel1"
"property1"
"row['property1']"
"property2"
"row['property2']"
"NodeLabel2"
"property1"
"row['property1']"
"property2"
"row['property2']"
"NodeLabel3"
"property1"
"row['property1']"
"property2"
"row['property2']"
try
f"节点定义:{node_definitions} "
except
as
f"获取节点定义失败:{e} "
raise
在这个代码片段中,我们首先使用 logging 库设置日志记录,logging 是一个 Python 模块,用于跟踪执行过程中的事件(如错误或状态更新):
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
我们使用 basicConfig 配置日志记录,以显示 INFO 级别或更高的消息,并初始化日志记录器实例,我将在代码中用它来记录消息。
这个步骤其实不是必需的,你也可以用 print 语句来代替它。然而,这是一个良好的工程实践。
接下来,我将创建一个函数来验证大语言模型生成的节点:
def validate_node_definition (node_def: Dict ) -> bool :
"""验证节点结构定义"""
if not isinstance (node_def, dict ):
return False
return all (
isinstance (v, dict ) and all (isinstance (k, str ) for k in v.keys())
for v in node_def.values()
)
该函数的输入是一个字典,其中键是节点标签(例如:Movie),值是属性的字典(例如:title、year)。
首先,函数检查 node_def 是否是一个字典,并验证字典中的每个值是否也是字典,并且这些字典中的所有键是否都是字符串。如果结构有效,则返回 True。
接下来,创建一个函数来调用 LLM 链并实际生成节点:
@retry(stop=stop_after_attempt(3 ), wait=wait_exponential(multiplier=1 , min =4 , max =10 ) )
def get_node_definitions (chain, structure: str , example: Dict ) -> Dict [str , Dict [str , str ]]:
"""获取带有重试逻辑的节点定义"""
try :
response = chain.invoke({"structure" : structure, "example" : example})
node_defs = ast.literal_eval(response)
if not validate_node_definition(node_defs):
raise ValueError("无效的节点结构定义" )
return node_defs
如果你不熟悉 Python 中的装饰器,可能会好奇 @retry(…) 这部分是做什么的,可以将其看作是一个包装函数,围绕着实际的 get_node_definitions 函数。在这种情况下,我调用了 retry 装饰器,如果发生错误,它会自动重试该函数。
● stop_after_attempt(3) : 最多重试 3 次。
● wait_exponential : 在重试之间增加延迟的时长(例如:4 秒、8 秒、16 秒等等)。
● chain : LangChain 管道(提示 + LLM)。我会在稍后定义这个管道。
● structure : 数据集结构(列和示例值)。
● example : 用于引导 LLM 的示例节点定义。
接下来,chain.invoke 将结构和示例发送给 LLM,并接收一个字符串作为响应。ast.literal_eval 将字符串响应转换成 Python 字典。
我使用 validate_node_definition 检查解析后的字典是否符合正确的格式,如果结构无效,它会引发 ValueError。
except (ValueError, SyntaxError) as e:
logger.error(f"Error parsing node definitions: {e} " )
raise
如果响应无法解析或验证,会记录错误信息,该函数会抛出异常。
接下来,我们为 LLM 提供一个提示词模板,以引导其完成节点生成任务:
define_nodes_prompt = PromptTemplate(
input_variables=["example" , "structure" ],
template=("""
分析以下数据集结构并提取节点的实体标签及其属性。
节点属性应基于数据集列和它们的值。
返回的结果应为一个字典,其中键是节点标签,值是节点属性。
示例:{example}
数据集结构:
{structure}
确保包括所有可能的节点标签及其属性。
如果某个属性可以是其自己的节点,请将其作为单独的节点标签。
请不要使用三重反引号标识代码块,只需返回元组的列表。
仅返回包含节点标签和属性的字典,不要包含任何其他文本或引号。
""" ),
)
请注意,我提供了本节开始时定义的节点结构,以及如何生成节点字典的示例:
node_example = {
"NodeLabel1" : {"property1" : "row['property1']" , "property2" : "row['property2']" },
"NodeLabel2" : {"property1" : "row['property1']" , "property2" : "row['property2']" },
"NodeLabel3" : {"property1" : "row['property1']" , "property2" : "row['property2']" },
}
在示例中,键是节点标签(例如:Movie、Director),值是映射到数据集列的属性字典(例如:row['property1'])。
try :
node_chain = define_nodes_prompt | llm
node_definitions = get_node_definitions(node_chain, structure=node_structure, example=node_example)
logger.info(f"节点定义:{node_definitions} " )
except Exception as e:
logger.error(f"获取节点定义失败:{e} " )
raise
在 LangChain 中,我们使用结构化提示词 | LLM | … 来创建一个链,将提示词模板与 LLM 结合,形成一个管道。我们使用 get_node_definitions 来获取并验证节点定义。
如果过程中出现失败,错误会被记录,并且程序会引发异常。
INFO:__main__:Node Definitions: {'Movie' : {'Release Year' : "row['Release Year']" , 'Title' : "row['Title']" }, 'Director' : {'Name' : "row['Director']" }, 'Cast' : {'Actor' : "row['Cast']" }, 'Genre' : {'Type' : "row['Genre']" }, 'Plot' : {'Description' : "row['Plot']" }}
一旦节点被定义,我们就可以识别它们之间的关系。接下来,我们来看看完整的代码是怎样的:
class RelationshipIdentifier :
"""识别图数据库中节点之间的关系。"""
RELATIONSHIP_EXAMPLE = [
("NodeLabel1" , "RelationshipLabel" , "NodeLabel2" ),
("NodeLabel1" , "RelationshipLabel" , "NodeLabel3" ),
("NodeLabel2" , "RelationshipLabel" , "NodeLabel3" ),
]
PROMPT_TEMPLATE = PromptTemplate(
input_variables=["structure" , "node_definitions" , "example" ],
template="""
考虑以下数据集结构:\n{structure}\n
考虑以下节点定义:\n{node_definitions}\n
根据数据集结构和节点定义,识别节点之间的关系(边)。
以三元组的形式返回关系,其中每个三元组包含起始节点标签、关系标签和结束节点标签,每个三元组是一个元组。
请仅返回元组列表。请不要使用三重反引号标识代码块,只返回元组列表。
示例:
{example}
"""
)
def __init__ (self, llm: Any , logger: logging.Logger = None ):
self .llm = llm
self .logger = logger or logging.getLogger(__name__)
self .chain = self .PROMPT_TEMPLATE | self .llm
def validate_relationships (self, relationships: List [Tuple ] ) -> bool :
"""验证关系结构"""
return all (
isinstance (rel, tuple ) and
len (rel) == 3 and
all (isinstance (x, str ) for x in rel)
for rel in relationships
)
@retry(stop=stop_after_attempt(3 ), wait=wait_exponential(multiplier=1 , min =4 , max =10 ) )
def identify_relationships (self, structure: str , node_definitions: Dict ) -> List [Tuple ]:
"""识别关系并应用重试逻辑"""
try :
response = self .chain.invoke({
"structure" : structure,
"node_definitions" : str (node_definitions),
"example" : str (self .RELATIONSHIP_EXAMPLE)
})
relationships = ast.literal_eval(response)
if not self .validate_relationships(relationships):
raise ValueError("无效的关系结构" )
self .logger.info(f"已验证 {len (relationships)} 个关系" )
return relationships
except Exception as e:
self .logger.error(f"验证关系时出现错误:{e} " )
raise
def get_relationship_types (self ) -> List [str ]:
"""提取唯一的关系类型。"""
return list (set (rel[1 ] for rel in self .identify_relationships()))
identifier = RelationshipIdentifier(llm=llm)
relationships = identifier.identify_relationships(node_structure, node_definitions)
print ("关系:" , relationships)
由于这段代码需要进行比节点生成更多的操作,我们将代码组织在一个类中 —— RelationshipIdentifier —— 以封装所有关系提取、验证和日志记录的逻辑。我们使用类似的逻辑,因此我们提供一个关系示例:
RELATIONSHIP_EXAMPLE = [
("NodeLabel1" , "RelationshipLabel" , "NodeLabel2" ),
("NodeLabel1" , "RelationshipLabel" , "NodeLabel3" ),
("NodeLabel2" , "RelationshipLabel" , "NodeLabel3" ),
]
● 起始节点标签:源节点的标签(例如:Movie)。
● 关系标签:连接类型(例如:DIRECTED_BY)。
● 结束节点标签:目标节点的标签(例如:Director)。
PROMPT_TEMPLATE = PromptTemplate(
input_variables=["structure" , "node_definitions" , "example" ],
template="""
考虑以下数据集结构:\n{structure}\n
考虑以下节点定义:\n{node_definitions}\n
根据数据集结构和节点定义,识别节点之间的关系(边)。
以三元组的形式返回关系,其中每个三元组包含起始节点标签、关系标签和结束节点标签,每个三元组是一个元组。
请仅返回元组列表。请不要使用三重反引号标识代码块,只返回元组列表。
示例:
{example}
"""
)
● structure:数据集结构,列出了列和示例值。我在本节开始时定义了它。
● node_definitions:节点标签及其属性的字典。这些节点是在上一节中由 LLM 生成的。
def __init__ (self, llm: Any , logger: logging.Logger = None ):
self .llm = llm
self .logger = logger or logging.getLogger(__name__)
self .chain = self .PROMPT_TEMPLATE | self .llm
● llm:用于处理提示的语言模型(例如:GPT-4o-mini)。
● logger:可选参数,用于记录进度和错误(如果未提供,则默认为标准日志记录器)。
● self.chain:将提示词模板与 LLM 结合,创建一个可重用的管道。
类似之前的做法,我们创建一个方法来验证生成的关系:
def validate_relationships (self, relationships: List [Tuple ] ) -> bool :
"""验证关系结构。"""
return all (
isinstance (rel, tuple ) and
len (rel) == 3 and
all (isinstance (x, str ) for x in rel)
for rel in relationships
)
该方法检查每个项目是否是元组,确保每个元组包含三个元素,并且所有元素都是字符串(例如:节点标签或关系类型)。最后,如果满足这些条件,则返回 TRUE,否则返回 FALSE。
@retry(stop=stop_after_attempt(3 ), wait=wait_exponential(multiplier=1 , min =4 , max =10 ) )
def identify_relationships (self, structure: str , node_definitions: Dict ) -> List [Tuple ]:
"""识别关系并应用重试逻辑。"""
try :
response = self .chain.invoke({
"structure" : structure,
"node_definitions" : str (node_definitions),
"example" : str (self .RELATIONSHIP_EXAMPLE)
})
relationships = ast.literal_eval(response)
if not self .validate_relationships(relationships):
raise ValueError("无效的关系结构" )
self .logger.info(f"已验证 {len (relationships)} 个关系" )
return relationships
我们再次使用 retry 装饰器来在失败时重新尝试 LLM 链,并以类似于节点生成时的方式调用链。
此外,我们使用 ast.literal_eval 将 LLM 的字符串输出转换成 Python 列表,并使用 validate_relationships 来确保输出格式正确。
except Exception as e:
self .logger.error(f"Error identifying relationships: {e} " )
raise
最后一个方法返回唯一的关系标签(例如:DIRECTED_BY、ACTED_IN):
def get_relationship_types (self ) -> List [str ]:
"""Extract unique relationship types."""
return list (set (rel[1 ] for rel in self .identify_relationships()))
它调用 identify_relationships 方法来获取关系列表。然后,它提取每个元组中的第二个元素(关系标签),使用 set 来去除重复项,并将结果转换回列表。
identifier = RelationshipIdentifier(llm=llm)
relationships = identifier.identify_relationships(node_structure, node_definitions)
print ("Relationships:" , relationships)
如果 LLM 在 3 次尝试内成功,它将返回一个类似以下内容的关系列表,以元组格式表示:
INFO:__main__:Identified 4 relationships
Relationships: [('Movie' , 'Directed By' , 'Director' ), ('Movie' , 'Starring' , 'Cast' ), ('Movie' , 'Has Genre' , 'Genre' ), ('Movie' , 'Contains Plot' , 'Plot' )]
在节点和关系定义完成后,我创建了 Cypher 查询将它们加载到 Neo4j 中。这个过程遵循与节点生成和关系生成类似的逻辑。然而,我们增加了几个额外的步骤来进行验证,因为生成的输出将用于将数据加载到我们的知识图谱中。因此,我们需要尽可能提高成功的概率。让我们首先看看完整的代码:
class CypherQueryBuilder :
"""构建用于 Neo4j 图数据库的 Cypher 查询。"""
INPUT_EXAMPLE = """
NodeLabel1: value1, value2
NodeLabel2: value1, value2
"""
EXAMPLE_CYPHER = example_cypher = """
CREATE (n1:NodeLabel1 {property1: "row['property1']", property2: "row['property2']"})
CREATE (n2:NodeLabel2 {property1: "row['property1']", property2: "row['property2']"})
CREATE (n1)-[:RelationshipLabel]->(n2);
"""
PROMPT_TEMPLATE = PromptTemplate(
input_variables=["structure" , "node_definitions" , "relationships" , "example" ],
template="""
考虑以下节点定义:\n{node_definitions}\n
考虑以下关系:\n{relationships}\n
生成 Cypher 查询以创建节点和关系,使用下面的节点定义和关系。记得用数据集中的实际数据替换占位符值。
包括每个节点的所有属性,按照节点定义,并创建关系。
返回一个包含每个查询用分号分隔的单个字符串。
请不要在响应中包含任何其他文本或引号。
请仅返回包含 Cypher 查询的字符串。请不要使用三重反引号标识代码块。
示例输入:
{input}
示例输出Cypher查询:
{cypher}
"""
)
def __init__ (self, llm: Any , logger: logging.Logger = None ):
self .llm = llm
self .logger = logger or logging.getLogger(__name__)
self .chain = self .PROMPT_TEMPLATE | self .llm
def validate_cypher_query (self, query: str ) -> bool :
"""使用 LLM 和正则表达式模式验证 Cypher 查询语法。"""
VALIDATION_PROMPT = PromptTemplate(
input_variables=["query" ],
template="""
验证此 Cypher 查询并返回 TRUE 或 FALSE:
查询:{query}
检查规则:
1. 有效的 CREATE 语句
2. 正确的属性格式
3. 有效的关系语法
4. 无缺失的括号
5. 有效的属性名称
6. 有效的关系类型
如果查询有效,返回 TRUE;如果无效,返回 FALSE。
"""
)
try :
basic_valid = all (re.search(pattern, query) for pattern in [
r'CREATE \(' ,
r'\{.*?\}' ,
r'\)-\[:.*?\]->'
])
if not basic_valid:
return False
validation_chain = VALIDATION_PROMPT | self .llm
result = validation_chain.invoke({"query" : query})
is_valid = "TRUE" in result.upper()
if not is_valid:
self .logger.warning(f"LLM 验证查询失败:{query} " )
return is_valid
except Exception as e:
self .logger.error(f"验证错误:{e} " )
return False
def sanitize_query (self, query: str ) -> str :
"""清理并格式化 Cypher 查询"""
return (
query
.strip()
.replace('\n' , ' ' )
.replace(' ' , ' ' )
.replace("'row[" , "row['" )
.replace("]'" , "']" ))
@retry(stop=stop_after_attempt(3 ), wait=wait_exponential(multiplier=1 , min =4 , max =10 ) )
def build_queries (self, node_definitions: Dict , relationships: List ) -> str :
"""构建带有重试逻辑的 Cypher 查询。"""
try :
response = self .chain.invoke({
"node_definitions" : str (node_definitions),
"relationships" : str (relationships),
"input" : self .INPUT_EXAMPLE,
"cypher" : self .EXAMPLE_CYPHER
})
if '```' in response:
response = response.split('```' )[1 ]
queries = self .sanitize_query(response)
if not self .validate_cypher_query(queries):
raise ValueError("无效的 Cypher 查询语法" )
self .logger.info("成功生成 Cypher 查询" )
return queries
except Exception as e:
self .logger.error(f"构建 Cypher 查询出错:{e} " )
raise
def split_queries (self, queries: str ) -> List [str ]:
"""将组合的查询拆分为单独的语句。"""
return [q.strip() for q in queries.split(';' ) if q.strip()]
builder = CypherQueryBuilder(llm=llm)
cypher_queries = builder.build_queries(node_definitions, relationships)
print ("Cypher 查询:" , cypher_queries)
PROMPT_TEMPLATE = PromptTemplate(
input_variables=["structure" , "node_definitions" , "relationships" , "example" ],
template="""
考虑以下节点定义:\n{node_definitions}\n
考虑以下关系:\n{relationships}\n
生成 Cypher 查询以创建节点和关系,使用下面的节点定义和关系。记得用数据集中的实际数据替换占位符值。
包括每个节点的所有属性,按照节点定义,并创建关系。
返回一个包含每个查询用分号分隔的单个字符串。
请不要在响应中包含任何其他文本或引号。
请仅返回包含 Cypher 查询的字符串。请不要使用三重反引号标识代码块。
示例输入:
{input}
示例输出Cypher查询:
{cypher}
"""
)
● node_definitions:生成的节点及其属性。
● relationships:节点之间生成的关系。
def __init__ (self, llm: Any , logger: logging.Logger = None ):
self .llm = llm
self .logger = logger or logging.getLogger(__name__)
self .chain = self .PROMPT_TEMPLATE | self .llm
def validate_cypher_query (self, query: str ) -> bool :
"""使用 LLM 和正则表达式模式验证 Cypher 查询语法。"""
VALIDATION_PROMPT = PromptTemplate(
input_variables=["query" ],
template="""
验证此 Cypher 查询并返回 TRUE 或 FALSE:
查询:{query}
检查规则:
1. 有效的 CREATE 语句
2. 正确的属性格式
3. 有效的关系语法
4. 无缺失的括号
5. 有效的属性名称
6. 有效的关系类型
如果查询有效,返回 TRUE;如果无效,返回 FALSE。
"""
)
try :
basic_valid = all (re.search(pattern, query) for pattern in [
r'CREATE \(' ,
r'\{.*?\}' ,
r'\)-\[:.*?\]->'
])
if not basic_valid:
return False
validation_chain = VALIDATION_PROMPT | self .llm
result = validation_chain.invoke({"query" : query})
is_valid = "TRUE" in result.upper()
if not is_valid:
self .logger.warning(f"LLM 验证查询失败:{query} " )
return is_valid
except Exception as e:
self .logger.error(f"验证错误:{e} " )
return False
该方法执行两个验证步骤。首先是使用正则表达式进行基本验证:
basic_valid = all (re.search(pattern, query) for pattern in [
r'CREATE \(' ,
r'\{.*?\}' ,
r'\)-\[:.*?\]->'
])
if not basic_valid:
return False
validation_chain = VALIDATION_PROMPT | self .llm
result = validation_chain.invoke({"query" : query})
is_valid = "TRUE" in result.upper()
有效的 CREATE 语句
正确的属性格式
有效的关系语法
无缺失的括号
有效的属性名称
有效的关系类型
到目前为止一切看上去工作的都还不错,这里,让我再添加一个方法,进一步清理生成的输出:
def sanitize_query (self, query: str ) -> str :
"""清理并格式化 Cypher 查询。"""
return (
query
.strip()
.replace('\n' , ' ' )
.replace(' ' , ' ' )
.replace("'row[" , "row['" )
.replace("]'" , "']" ))
我将移除不必要的空格以及换行符(\n),并修复与数据集引用相关的潜在格式问题(例如:row['property1'])。
请根据你所使用的大语言模型考虑更新此方法,较小参数量的模型可能需要更多的数据清理操作。
@retry(stop=stop_after_attempt(3 ), wait=wait_exponential(multiplier=1 , min =4 , max =10 ) )
def build_queries (self, node_definitions: Dict , relationships: List ) -> str :
"""构建带有重试逻辑的 Cypher 查询。"""
try :
response = self .chain.invoke({
"node_definitions" : str (node_definitions),
"relationships" : str (relationships),
"input" : self .INPUT_EXAMPLE,
"cypher" : self .EXAMPLE_CYPHER
})
if '```' in response:
response = response.split('```' )[1 ]
queries = self .sanitize_query(response)
if not self .validate_cypher_query(queries):
raise ValueError("无效的 Cypher 查询语法" )
self .logger.info("成功生成 Cypher 查询" )
return queries
except Exception as e:
self .logger.error(f"构建 Cypher 查询时出错:{e} " )
raise
这个方法与关系构建器类中的方法类似,唯一的不同之处是:
if '```' in response:
response = response.split('```' )[1 ]
在这里,LLM 可能会提供额外的 Markdown 格式来指定它是一个代码块。如果 LLM 的响应中存在这种格式,我只会提取三重反引号内的代码。
接下来,我定义一个方法,将单一的 Cypher 查询字符串拆分成单独的语句:
def split_queries (self, queries: str ) -> List [str ]:
"""将组合的查询拆分为单独的语句"""
return [q.strip() for q in queries.split(';' ) if q.strip()]
CREATE (n1:Movie {title: "Inception" }); CREATE (n2:Director {name: "Nolan" });
["CREATE (n1:Movie {title: 'Inception'})" , "CREATE (n2:Director {name: 'Nolan'})" ]
builder = CypherQueryBuilder(llm=llm)
cypher_queries = builder.build_queries(node_definitions, relationships)
print ("Cypher 查询:" , cypher_queries)
INFO:__main__:Successfully generated Cypher queries
Cypher Queries: CREATE (m:Movie {Release_Year: "row['Release Year']" , Title: "row['Title']" }) CREATE (d:Director {Name: "row['Director']" }) CREATE (c:Cast {Actor: "row['Cast']" }) CREATE (g:Genre {Type : "row['Genre']" }) CREATE (p:Plot {Description: "row['Plot']" }) CREATE (m)-[:Directed_By]->(d) CREATE (m)-[:Starring]->(c) CREATE (m)-[:Has_Genre]->(g) CREATE (m)-[:Contains_Plot]->(p)
最后,遍历数据集,并为每一行执行生成的 Cypher 查询。
logs = ""
total_rows = len (df)
def sanitize_value (value ):
if isinstance (value, str ):
return value.replace('"' , '' )
return str (value)
for index, row in tqdm(df.iterrows(),
total=total_rows,
desc="正在加载数据到 Neo4j" ,
position=0 ,
leave=True ):
cypher_query = cypher_queries
for column in df.columns:
cypher_query = cypher_query.replace(
f"row['{column} ']" ,
f'{sanitize_value(row[column])} '
)
try :
conn.execute_query(cypher_query)
except Exception as e:
logs += f"在行 {index+1 } : {str (e)} 出现错误\n"
请注意,我定义了一个空字符串变量 logs,用于捕获潜在的失败。我还添加了一个清理函数,用于传递给每个行输入的值:
def sanitize_value (value ):
if isinstance (value, str ):
return value.replace('"' , '' )
return str (value)
for index, row in tqdm(df.iterrows(),
total=total_rows,
desc="正在加载数据到 Neo4j" ,
position=0 ,
leave=True ):
cypher_query = cypher_queries
for column in df.columns:
cypher_query = cypher_query.replace(
f"row['{column} ']" ,
f'{sanitize_value(row[column])} '
)
try :
conn.execute_query(cypher_query)
except Exception as e:
logs += f"在行 {index+1 } : {str (e)} 出现错误\n"
正如我在练习开始时提到的,我使用 tqdm 为进度条添加了一个漂亮的外观,以可视化的方式显示处理了多少行数据。我传递了 df.iterrows() 来遍历 DataFrame,提供索引和行数据。total=total_rows 由 tqdm 用于计算进度。添加 desc="正在加载数据到 Neo4j" 来为进度条提供标签。最后,position=0, leave=True 确保进度条在控制台中保持可见。
接下来,我将像 row['column_name'] 这样的占位符替换成实际的数据集值,将每个值传递给 sanitize_value 函数,并执行查询。
让我们检查一下数据集是否已上传。切换到 Neo4j,并运行以下 Cypher 查询:
MATCH p=(m:Movie)-[r]-(n)
RETURN p
LIMIT 100 ;
这与我们手动上传的知识图谱非常相似。对于一个简单的 LLM 来说,这还不赖对吧。虽然这需要相当多的编码工作,但我们现在可以将其重用于多个数据集,更重要的是,可以将其作为基础,创建更复杂的 LLM 图形构建器。
在我提供的示例中,还没有通过提供实体、关系和属性来帮助 LLM。然而,考虑将它们作为示例来提高 LLM 的性能。此外,更现代化的方法利用思维链来提出额外的节点和关系。这使得模型能够顺序推理并进一步改进结果。另一种策略是提供行样本,以更好的适应每行中提供的值。
本文详细介绍了从零开始构建知识图谱的技术路径。通过模块化设计(节点生成、关系识别、Cypher 构建),实现了数据到图结构的自动化转换。引入重试机制和双重验证(正则 + LLM)显著提升了稳定性。虽然当前实现依赖大量编码,但为后续优化 GraphRAG 系统奠定了坚实基础。在实际生产中,建议结合 Few-Shot Prompting 和思维链技术进一步提升准确率。