Flask实现Neo4j知识图谱Web应用

创建一个完整的Flask Web应用,用于管理和可视化Neo4j知识图谱。
1. 项目结构
text
flask_kg_app/ │ ├── app.py # 主应用文件 ├── requirements.txt # 依赖包 ├── config.py # 配置文件 ├── .env # 环境变量 │ ├── static/ # 静态文件 │ ├── css/ │ ├── js/ │ └── images/ │ ├── templates/ # HTML模板 │ ├── base.html │ ├── index.html │ ├── query.html │ ├── visualize.html │ ├── manage.html │ └── dashboard.html │ ├── utils/ # 工具模块 │ ├── neo4j_connector.py │ ├── kg_builder.py │ └── visualizer.py │ └── data/ # 数据文件 ├── sample_data.csv └── imports/
2. 配置文件
config.py
python
import os from dotenv import load_dotenv load_dotenv() class Config: """应用配置""" # 基础配置 SECRET_KEY = os.getenv('SECRET_KEY', 'your-secret-key-here') # Neo4j配置 NEO4J_URI = os.getenv('NEO4J_URI', 'bolt://localhost:7687') NEO4J_USER = os.getenv('NEO4J_USER', 'neo4j') NEO4J_PASSWORD = os.getenv('NEO4J_PASSWORD', 'password') # 应用配置 UPLOAD_FOLDER = os.path.join(os.path.dirname(__file__), 'data/uploads') MAX_CONTENT_LENGTH = 16 * 1024 * 1024 # 16MB # 允许的文件扩展名 ALLOWED_EXTENSIONS = {'csv', 'json', 'txt'} @staticmethod def init_app(app): """初始化应用配置""" # 确保上传目录存在 if not os.path.exists(Config.UPLOAD_FOLDER): os.makedirs(Config.UPLOAD_FOLDER)
.env
env
SECRET_KEY=your-secret-key-change-this NEO4J_URI=bolt://localhost:7687 NEO4J_USER=neo4j NEO4J_PASSWORD=your-password DEBUG=True
3. Neo4j连接工具
utils/neo4j_connector.py
python
from neo4j import GraphDatabase, BoltDriver from typing import List, Dict, Any, Optional import pandas as pd import json class Neo4jConnector: """Neo4j数据库连接管理器""" def __init__(self, uri: str, user: str, password: str): """ 初始化Neo4j连接 :param uri: Neo4j连接URI :param user: 用户名 :param password: 密码 """ self.uri = uri self.user = user self.password = password self.driver = None self._connect() def _connect(self): """建立数据库连接""" try: self.driver = GraphDatabase.driver( self.uri, auth=(self.user, self.password), max_connection_lifetime=30 * 60, connection_timeout=15, connection_acquisition_timeout=2 * 60 ) # 测试连接 with self.driver.session() as session: session.run("RETURN 1") print("Neo4j连接成功") except Exception as e: print(f"Neo4j连接失败: {e}") self.driver = None def close(self): """关闭数据库连接""" if self.driver: self.driver.close() def is_connected(self) -> bool: """检查连接状态""" try: with self.driver.session() as session: session.run("RETURN 1") return True except: return False def execute_query(self, query: str, params: dict = None) -> List[Dict]: """ 执行Cypher查询 :param query: Cypher查询语句 :param params: 查询参数 :return: 查询结果列表 """ if not self.is_connected(): self._connect() try: with self.driver.session() as session: result = session.run(query, parameters=params or {}) records = [] for record in result: # 将Record对象转换为字典 record_dict = {} for key in record.keys(): value = record[key] # 处理Neo4j的特殊类型 if hasattr(value, '__dict__'): value = dict(value) record_dict[key] = value records.append(record_dict) return records except Exception as e: print(f"查询执行失败: {e}") return [] def get_database_info(self) -> Dict[str, Any]: """获取数据库信息""" queries = { "节点统计": """ CALL db.labels() YIELD label CALL apoc.cypher.run('MATCH (n:`' + label + '`) RETURN count(*) as count', {}) YIELD value RETURN label, value.count as count ORDER BY label """, "关系统计": """ CALL db.relationshipTypes() YIELD relationshipType CALL apoc.cypher.run('MATCH ()-[r:`' + relationshipType + '`]->() RETURN count(*) as count', {}) YIELD value RETURN relationshipType, value.count as count ORDER BY relationshipType """, "属性统计": """ MATCH (n) UNWIND keys(n) as key RETURN key, count(*) as count, collect(distinct apoc.convert.toString(n[key]))[0..5] as sample_values ORDER BY count DESC LIMIT 10 """ } info = {"状态": "已连接" if self.is_connected() else "未连接"} if self.is_connected(): for name, query in queries.items(): try: info[name] = self.execute_query(query) except: info[name] = [] return info def import_csv(self, filepath: str, label: str = None) -> Dict[str, Any]: """ 从CSV文件导入数据 :param filepath: CSV文件路径 :param label: 节点标签 :return: 导入结果 """ try: df = pd.read_csv(filepath) if 'label' in df.columns and not label: # 使用CSV中的label列 labels = df['label'].unique() results = {} for lbl in labels: nodes_df = df[df['label'] == lbl] properties = {} for _, row in nodes_df.iterrows(): node_props = {} for col in nodes_df.columns: if col != 'label' and pd.notna(row[col]): node_props[col] = row[col] query = f""" CREATE (n:{lbl} $props) RETURN id(n) as id """ result = self.execute_query(query, {"props": node_props}) if result: node_id = result[0]['id'] properties[str(node_id)] = node_props results[lbl] = { "count": len(nodes_df), "properties": properties } return { "success": True, "message": f"成功导入 {len(df)} 个节点", "results": results } else: # 使用指定的label nodes_created = 0 properties = {} for _, row in df.iterrows(): props = {} for col in df.columns: if pd.notna(row[col]): props[col] = row[col] query = f""" CREATE (n:{label} $props) RETURN id(n) as id """ result = self.execute_query(query, {"props": props}) if result: node_id = result[0]['id'] properties[str(node_id)] = props nodes_created += 1 return { "success": True, "message": f"成功导入 {nodes_created} 个节点", "results": {label: {"count": nodes_created, "properties": properties}} } except Exception as e: return { "success": False, "message": f"导入失败: {str(e)}" } def export_to_json(self, filename: str = "knowledge_graph.json") -> Dict[str, Any]: """导出知识图谱为JSON格式""" try: # 获取所有节点" MATCH (n) RETURN id(n) as id, labels(n) as labels, properties(n) as properties """ # 获取所有关系" MATCH (a)-[r]->(b) RETURN id(a) as start_id, type(r) as type, properties(r) as properties, id(b) as end_id, labels(a) as start_labels, labels(b) as end_labels """ nodes = self.execute_query(nodes_query) relationships = self.execute_query(relationships_query) # 构建图结构 graph_data = { "nodes": nodes, "relationships": relationships, "metadata": { "node_count": len(nodes), "relationship_count": len(relationships), "export_time": pd.Timestamp.now().isoformat() } } # 保存到文件 with open(filename, 'w', encoding='utf-8') as f: json.dump(graph_data, f, ensure_ascii=False, indent=2) return { "success": True, "message": f"数据已导出到 {filename}", "filepath": filename, "stats": { "nodes": len(nodes), "relationships": len(relationships) } } except Exception as e: return { "success": False, "message": f"导出失败: {str(e)}" } def clear_database(self) -> Dict[str, Any]: """清空数据库""" try: query = "MATCH (n) DETACH DELETE n" self.execute_query(query) # 重建索引 self.execute_query("CREATE CONSTRAINT IF NOT EXISTS FOR (n:Person) REQUIRE n.id IS UNIQUE") self.execute_query("CREATE CONSTRAINT IF NOT EXISTS FOR (n:Company) REQUIRE n.id IS UNIQUE") self.execute_query("CREATE INDEX IF NOT EXISTS FOR (n:Person) ON (n.name)") self.execute_query("CREATE INDEX IF NOT EXISTS FOR (n:Company) ON (n.name)") return { "success": True, "message": "数据库已清空并重建索引" } except Exception as e: return { "success": False, "message": f"清空失败: {str(e)}" }
4. Flask主应用
app.py
python
from flask import Flask, render_template, request, jsonify, redirect, url_for, flash, send_file from flask_cors import CORS import os import json from datetime import datetime from config import Config from utils.neo4j_connector import Neo4jConnector from utils.kg_builder import KnowledgeGraphBuilder from utils.visualizer import KGVisualizer # 初始化应用 app = Flask(__name__) app.config.from_object(Config) CORS(app) # 初始化Neo4j连接 neo4j_conn = Neo4jConnector( uri=app.config['NEO4J_URI'], user=app.config['NEO4J_USER'], password=app.config['NEO4J_PASSWORD'] ) # 初始化知识图谱构建器 kg_builder = KnowledgeGraphBuilder(neo4j_conn) @app.route('/') def index(): """首页""" db_info = neo4j_conn.get_database_info() return render_template('index.html', db_info=db_info) @app.route('/dashboard') def dashboard(): """仪表盘""" # 获取统计信息" MATCH (n) WITH labels(n) as labels UNWIND labels as label RETURN label, count(*) as count ORDER BY count DESC """ stats = neo4j_conn.execute_query(stats_query) # 获取最近添加的节点" MATCH (n) WHERE n.created_at IS NOT NULL RETURN n.name as name, labels(n)[0] as label, n.created_at as created_at ORDER BY n.created_at DESC LIMIT 10 """ recent_nodes = neo4j_conn.execute_query(recent_query) # 获取数据库信息 db_info = neo4j_conn.get_database_info() return render_template('dashboard.html', stats=stats, recent_nodes=recent_nodes, db_info=db_info) @app.route('/query', methods=['GET', 'POST']) def query(): """查询页面""" results = [] if request.method == 'POST': query_text = request.form.get('query', '') if query_text: results = neo4j_conn.execute_query(query_text) # 获取预定义查询模板 query_templates = [ { "name": "查找所有人物", "query": "MATCH (p:Person) RETURN p.name as name, p.age as age, p.profession as profession LIMIT 50" }, { "name": "查找所有关系", "query": "MATCH (a)-[r]->(b) RETURN a.name as from, type(r) as relation, b.name as to LIMIT 50" }, { "name": "查找朋友的朋友", "query": "MATCH (p:Person {name: 'Alice'})-[:FRIEND*2]->(fof) RETURN DISTINCT fof.name as name" }, { "name": "查找所有公司及其员工", "query": "MATCH (c:Company)<-[:WORKS_AT]-(p:Person) RETURN c.name as company, collect(p.name) as employees" } ] return render_template('query.html', results=results, query_text=query_text, query_templates=query_templates) @app.route('/visualize') def visualize(): """可视化页面""" # 获取所有节点类型用于筛选 node_labels_query = "CALL db.labels() YIELD label RETURN label" node_labels = [item['label'] for item in neo4j_conn.execute_query(node_labels_query)] # 获取所有关系类型 rel_types_query = "CALL db.relationshipTypes() YIELD relationshipType RETURN relationshipType" rel_types = [item['relationshipType'] for item in neo4j_conn.execute_query(rel_types_query)] # 获取示例数据用于预览" MATCH (n) RETURN labels(n)[0] as label, properties(n) as properties LIMIT 5 """ sample_data = neo4j_conn.execute_query(sample_query) return render_template('visualize.html', node_labels=node_labels, rel_types=rel_types, sample_data=sample_data) @app.route('/api/graph_data', methods=['GET']) def get_graph_data(): """获取图谱数据API""" try: # 获取查询参数 label_filter = request.args.get('label', '') limit = int(request.args.get('limit', 100)) # 构建查询 if label_filter: query = f""" MATCH (n:{label_filter})-[r]-(m) RETURN id(n) as source_id, labels(n)[0] as source_label, properties(n) as source_props, id(m) as target_id, labels(m)[0] as target_label, properties(m) as target_props, type(r) as relationship_type, properties(r) as relationship_props LIMIT {limit} """ else: query = f""" MATCH (n)-[r]-(m) RETURN id(n) as source_id, labels(n)[0] as source_label, properties(n) as source_props, id(m) as target_id, labels(m)[0] as target_label, properties(m) as target_props, type(r) as relationship_type, properties(r) as relationship_props LIMIT {limit} """ results = neo4j_conn.execute_query(query) # 构建节点和边数据 nodes = {} edges = [] node_counter = 0 for record in results: # 处理源节点 source_id = record['source_id'] if source_id not in nodes: nodes[source_id] = { "id": node_counter, "neo4j_id": source_id, "label": record['source_label'], "properties": record['source_props'], "name": record['source_props'].get('name', f"Node_{source_id}") } node_counter += 1 # 处理目标节点 target_id = record['target_id'] if target_id not in nodes: nodes[target_id] = { "id": node_counter, "neo4j_id": target_id, "label": record['target_label'], "properties": record['target_props'], "name": record['target_props'].get('name', f"Node_{target_id}") } node_counter += 1 # 添加边 edges.append({ "source": nodes[source_id]["id"], "target": nodes[target_id]["id"], "type": record['relationship_type'], "properties": record['relationship_props'] }) return jsonify({ "success": True, "nodes": list(nodes.values()), "edges": edges, "count": { "nodes": len(nodes), "edges": len(edges) } }) except Exception as e: return jsonify({ "success": False, "error": str(e) }), 500 @app.route('/manage', methods=['GET', 'POST']) def manage(): """管理页面""" message = None message_type = None if request.method == 'POST': action = request.form.get('action') if action == 'clear': result = neo4j_conn.clear_database() message = result['message'] message_type = 'success' if result['success'] else 'error' elif action == 'export': filename = f"kg_export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" result = neo4j_conn.export_to_json(filename) message = result['message'] message_type = 'success' if result['success'] else 'error' if result['success']: return send_file(filename, as_attachment=True) elif action == 'import_csv': if 'csv_file' in request.files: file = request.files['csv_file'] if file.filename != '': filename = os.path.join(app.config['UPLOAD_FOLDER'], file.filename) file.save(filename) label = request.form.get('label', 'Node') result = neo4j_conn.import_csv(filename, label) message = result['message'] message_type = 'success' if result['success'] else 'error' elif action == 'create_sample': result = kg_builder.create_sample_data() message = result['message'] message_type = 'success' if result['success'] else 'error' return render_template('manage.html', message=message, message_type=message_type) @app.route('/api/create_node', methods=['POST']) def create_node(): """创建节点API""" try: data = request.json label = data.get('label') properties = data.get('properties', {}) # 添加创建时间戳 properties['created_at'] = datetime.now().isoformat() query = f""" CREATE (n:{label} $props) RETURN id(n) as id, labels(n) as labels, properties(n) as properties """ result = neo4j_conn.execute_query(query, {"props": properties}) if result: return jsonify({ "success": True, "message": "节点创建成功", "node": result[0] }) else: return jsonify({ "success": False, "message": "节点创建失败" }), 400 except Exception as e: return jsonify({ "success": False, "message": str(e) }), 500 @app.route('/api/create_relationship', methods=['POST']) def create_relationship(): """创建关系API""" try: data = request.json source_id = data.get('source_id') target_id = data.get('target_id') rel_type = data.get('type') properties = data.get('properties', {}) query = f""" MATCH (a) WHERE id(a) = $source_id MATCH (b) WHERE id(b) = $target_id CREATE (a)-[r:{rel_type} $props]->(b) RETURN id(r) as id, type(r) as type, properties(r) as properties """ params = { "source_id": source_id, "target_id": target_id, "props": properties } result = neo4j_conn.execute_query(query, params) if result: return jsonify({ "success": True, "message": "关系创建成功", "relationship": result[0] }) else: return jsonify({ "success": False, "message": "关系创建失败" }), 400 except Exception as e: return jsonify({ "success": False, "message": str(e) }), 500 @app.route('/api/search', methods=['GET']) def search(): """搜索API""" try: keyword = request.args.get('q', '') search_type = request.args.get('type', 'both') # node, relation, both results = [] if search_type in ['node', 'both']: # 搜索节点" MATCH (n) WHERE n.name CONTAINS $keyword OR n.id CONTAINS $keyword RETURN id(n) as id, labels(n) as labels, properties(n) as properties, 'node' as type LIMIT 20 """ nodes = neo4j_conn.execute_query(node_query, {"keyword": keyword}) results.extend(nodes) if search_type in ['relation', 'both']: # 搜索关系" MATCH (a)-[r]-(b) WHERE type(r) CONTAINS $keyword RETURN id(r) as id, type(r) as type, properties(r) as properties, id(a) as source_id, id(b) as target_id, 'relationship' as result_type LIMIT 20 """ relationships = neo4j_conn.execute_query(rel_query, {"keyword": keyword}) results.extend(relationships) return jsonify({ "success": True, "count": len(results), "results": results }) except Exception as e: return jsonify({ "success": False, "message": str(e) }), 500 @app.route('/api/statistics') def get_statistics(): """获取统计数据API""" try: # 节点统计 node_stats = neo4j_conn.execute_query(""" CALL db.labels() YIELD label CALL apoc.cypher.run('MATCH (n:`' + label + '`) RETURN count(*) as count', {}) YIELD value RETURN label, value.count as count ORDER BY count DESC """) # 关系统计 rel_stats = neo4j_conn.execute_query(""" CALL db.relationshipTypes() YIELD relationshipType CALL apoc.cypher.run('MATCH ()-[r:`' + relationshipType + '`]->() RETURN count(*) as count', {}) YIELD value RETURN relationshipType, value.count as count ORDER BY count DESC """) # 属性统计 prop_stats = neo4j_conn.execute_query(""" MATCH (n) UNWIND keys(n) as key RETURN key, count(*) as count ORDER BY count DESC LIMIT 10 """) return jsonify({ "success": True, "statistics": { "nodes_by_label": node_stats, "relationships_by_type": rel_stats, "top_properties": prop_stats } }) except Exception as e: return jsonify({ "success": False, "message": str(e) }), 500 @app.route('/health') def health_check(): """健康检查""" db_status = neo4j_conn.is_connected() return jsonify({ "status": "healthy" if db_status else "unhealthy", "database": "connected" if db_status else "disconnected", "timestamp": datetime.now().isoformat() }) @app.teardown_appcontext def teardown_db(exception): """关闭数据库连接""" neo4j_conn.close() if __name__ == '__main__': app.run(debug=True, port=5000, host='0.0.0.0')
5. 知识图谱构建器
utils/kg_builder.py
python
class KnowledgeGraphBuilder: """知识图谱构建器""" def __init__(self, neo4j_connector): self.conn = neo4j_connector def create_sample_data(self): """创建示例数据""" try: # 创建电影知识图谱示例 sample_data = [ # 创建电影 ("CREATE (m:Movie {title: 'The Matrix', year: 1999, genre: 'Sci-Fi', rating: 8.7})", {}), ("CREATE (m:Movie {title: 'Inception', year: 2010, genre: 'Sci-Fi', rating: 8.8})", {}), ("CREATE (m:Movie {title: 'The Godfather', year: 1972, genre: 'Crime', rating: 9.2})", {}), # 创建演员 ("CREATE (p:Person {name: 'Keanu Reeves', born: 1964, nationality: 'Canadian'})", {}), ("CREATE (p:Person {name: 'Laurence Fishburne', born: 1961, nationality: 'American'})", {}), ("CREATE (p:Person {name: 'Leonardo DiCaprio', born: 1974, nationality: 'American'})", {}), ("CREATE (p:Person {name: 'Marlon Brando', born: 1924, nationality: 'American'})", {}), # 创建导演 ("CREATE (p:Person {name: 'The Wachowskis', profession: 'Directors'})", {}), ("CREATE (p:Person {name: 'Christopher Nolan', born: 1970, nationality: 'British'})", {}), ("CREATE (p:Person {name: 'Francis Ford Coppola', born: 1939, nationality: 'American'})", {}), # 创建关系 (""" MATCH (a:Person {name: 'Keanu Reeves'}) MATCH (b:Movie {title: 'The Matrix'}) CREATE (a)-[r:ACTED_IN {role: 'Neo'}]->(b) """, {}), (""" MATCH (a:Person {name: 'Laurence Fishburne'}) MATCH (b:Movie {title: 'The Matrix'}) CREATE (a)-[r:ACTED_IN {role: 'Morpheus'}]->(b) """, {}), (""" MATCH (a:Person {name: 'Leonardo DiCaprio'}) MATCH (b:Movie {title: 'Inception'}) CREATE (a)-[r:ACTED_IN {role: 'Cobb'}]->(b) """, {}), (""" MATCH (a:Person {name: 'Marlon Brando'}) MATCH (b:Movie {title: 'The Godfather'}) CREATE (a)-[r:ACTED_IN {role: 'Vito Corleone'}]->(b) """, {}), (""" MATCH (a:Person {name: 'The Wachowskis'}) MATCH (b:Movie {title: 'The Matrix'}) CREATE (a)-[r:DIRECTED]->(b) """, {}), (""" MATCH (a:Person {name: 'Christopher Nolan'}) MATCH (b:Movie {title: 'Inception'}) CREATE (a)-[r:DIRECTED]->(b) """, {}), (""" MATCH (a:Person {name: 'Francis Ford Coppola'}) MATCH (b:Movie {title: 'The Godfather'}) CREATE (a)-[r:DIRECTED]->(b) """, {}) ] for query, params in sample_data: self.conn.execute_query(query, params) return { "success": True, "message": "示例数据创建成功", "data": { "movies": 3, "people": 7, "relationships": 7 } } except Exception as e: return { "success": False, "message": f"创建示例数据失败: {str(e)}" } def build_company_kg(self): """构建公司知识图谱""" try: # 创建公司节点 companies = [ {"name": "Apple", "industry": "Technology", "founded": 1976}, {"name": "Google", "industry": "Technology", "founded": 1998}, {"name": "Microsoft", "industry": "Technology", "founded": 1975} ] # 创建人物节点 people = [ {"name": "Steve Jobs", "role": "Co-founder", "company": "Apple"}, {"name": "Tim Cook", "role": "CEO", "company": "Apple"}, {"name": "Larry Page", "role": "Co-founder", "company": "Google"}, {"name": "Sundar Pichai", "role": "CEO", "company": "Google"}, {"name": "Bill Gates", "role": "Co-founder", "company": "Microsoft"}, {"name": "Satya Nadella", "role": "CEO", "company": "Microsoft"} ] # 创建产品节点 products = [ {"name": "iPhone", "type": "Smartphone", "company": "Apple"}, {"name": "MacBook", "type": "Laptop", "company": "Apple"}, {"name": "Google Search", "type": "Search Engine", "company": "Google"}, {"name": "Android", "type": "Operating System", "company": "Google"}, {"name": "Windows", "type": "Operating System", "company": "Microsoft"}, {"name": "Office", "type": "Productivity Suite", "company": "Microsoft"} ] # 批量创建节点 for company in companies: self.conn.execute_query( "CREATE (c:Company $props)", {"props": company} ) for person in people: self.conn.execute_query( "CREATE (p:Person $props)", {"props": person} ) for product in products: self.conn.execute_query( "CREATE (p:Product $props)", {"props": product} ) # 创建关系 relationships = [ # 人物-公司关系 ("MATCH (p:Person {name: 'Steve Jobs'}) MATCH (c:Company {name: 'Apple'}) CREATE (p)-[:FOUNDED]->(c)", {}), ("MATCH (p:Person {name: 'Tim Cook'}) MATCH (c:Company {name: 'Apple'}) CREATE (p)-[:WORKS_AS {position: 'CEO'}]->(c)", {}), ("MATCH (p:Person {name: 'Larry Page'}) MATCH (c:Company {name: 'Google'}) CREATE (p)-[:FOUNDED]->(c)", {}), ("MATCH (p:Person {name: 'Sundar Pichai'}) MATCH (c:Company {name: 'Google'}) CREATE (p)-[:WORKS_AS {position: 'CEO'}]->(c)", {}), ("MATCH (p:Person {name: 'Bill Gates'}) MATCH (c:Company {name: 'Microsoft'}) CREATE (p)-[:FOUNDED]->(c)", {}), ("MATCH (p:Person {name: 'Satya Nadella'}) MATCH (c:Company {name: 'Microsoft'}) CREATE (p)-[:WORKS_AS {position: 'CEO'}]->(c)", {}), # 产品-公司关系 ("MATCH (p:Product {name: 'iPhone'}) MATCH (c:Company {name: 'Apple'}) CREATE (p)-[:PRODUCED_BY]->(c)", {}), ("MATCH (p:Product {name: 'MacBook'}) MATCH (c:Company {name: 'Apple'}) CREATE (p)-[:PRODUCED_BY]->(c)", {}), ("MATCH (p:Product {name: 'Google Search'}) MATCH (c:Company {name: 'Google'}) CREATE (p)-[:PRODUCED_BY]->(c)", {}), ("MATCH (p:Product {name: 'Android'}) MATCH (c:Company {name: 'Google'}) CREATE (p)-[:PRODUCED_BY]->(c)", {}), ("MATCH (p:Product {name: 'Windows'}) MATCH (c:Company {name: 'Microsoft'}) CREATE (p)-[:PRODUCED_BY]->(c)", {}), ("MATCH (p:Product {name: 'Office'}) MATCH (c:Company {name: 'Microsoft'}) CREATE (p)-[:PRODUCED_BY]->(c)", {}), # 竞争关系 ("MATCH (c1:Company {name: 'Apple'}) MATCH (c2:Company {name: 'Google'}) CREATE (c1)-[:COMPETES_WITH]->(c2)", {}), ("MATCH (c1:Company {name: 'Apple'}) MATCH (c2:Company {name: 'Microsoft'}) CREATE (c1)-[:COMPETES_WITH]->(c2)", {}), ("MATCH (c1:Company {name: 'Google'}) MATCH (c2:Company {name: 'Microsoft'}) CREATE (c1)-[:COMPETES_WITH]->(c2)", {}) ] for query, params in relationships: self.conn.execute_query(query, params) return { "success": True, "message": "公司知识图谱创建成功", "data": { "companies": len(companies), "people": len(people), "products": len(products), "relationships": len(relationships) } } except Exception as e: return { "success": False, "message": f"构建公司知识图谱失败: {str(e)}" }
6. HTML模板
templates/base.html
html
<!DOCTYPE html> <html lang="zh-CN"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>{% block title %}知识图谱管理系统{% endblock %}</title> <!-- Bootstrap CSS --> <link href="https://cdn.jsdelivr.net/npm/[email protected]/dist/css/bootstrap.min.css" rel="stylesheet"> <!-- Font Awesome --> <link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.0.0/css/all.min.css"> <!-- 自定义样式 --> <style> :root { --primary-color: #3498db; --secondary-color: #2c3e50; --accent-color: #e74c3c; --light-bg: #f8f9fa; --dark-bg: #343a40; } body { font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; background-color: var(--light-bg); } .navbar-brand { font-weight: bold; color: var(--primary-color) !important; } .sidebar { min-height: calc(100vh - 56px); background-color: var(--secondary-color); color: white; } .sidebar a { color: rgba(255, 255, 255, 0.8); text-decoration: none; padding: 10px 15px; display: block; border-radius: 5px; margin: 5px 0; transition: all 0.3s; } .sidebar a:hover { background-color: rgba(255, 255, 255, 0.1); color: white; } .sidebar a.active { background-color: var(--primary-color); color: white; } .sidebar i { margin-right: 10px; width: 20px; text-align: center; } .card { border: none; box-shadow: 0 2px 10px rgba(0,0,0,0.1); margin-bottom: 20px; transition: transform 0.3s; } .card:hover { transform: translateY(-2px); } .card-header { background-color: var(--primary-color); color: white; font-weight: bold; } .stat-card { text-align: center; padding: 20px; background: linear-gradient(135deg, var(--primary-color), #2980b9); color: white; border-radius: 10px; } .stat-card i { font-size: 3rem; margin-bottom: 10px; } .stat-card .number { font-size: 2.5rem; font-weight: bold; } .stat-card .label { font-size: 1rem; opacity: 0.9; } .btn-primary { background-color: var(--primary-color); border-color: var(--primary-color); } .btn-primary:hover { background-color: #2980b9; border-color: #2980b9; } .table-hover tbody tr:hover { background-color: rgba(52, 152, 219, 0.1); } .alert { border: none; border-radius: 5px; } .alert-success { background-color: #d4edda; color: #155724; } .alert-error { background-color: #f8d7da; color: #721c24; } .cypher-editor { font-family: 'Courier New', monospace; min-height: 200px; } #graph-container { width: 100%; height: 600px; border: 1px solid #ddd; border-radius: 5px; background-color: white; } .node-tooltip { position: absolute; background: white; border: 1px solid #ddd; border-radius: 3px; padding: 10px; box-shadow: 0 2px 10px rgba(0,0,0,0.1); display: none; z-index: 1000; max-width: 300px; } @media (max-width: 768px) { .sidebar { min-height: auto; margin-bottom: 20px; } } </style> {% block extra_css %}{% endblock %} </head> <body> <!-- 导航栏 --> <nav> <div> <a href="{{ url_for('index') }}"> <i></i> 知识图谱管理系统 </a> <button type="button"> <span></span> </button> <div> <ul> <li> <a href="{{ url_for('health') }}"> <i></i> 健康检查 </a> </li> <li> <span> <i></i> {% if neo4j_conn.is_connected() %} <span>数据库已连接</span> {% else %} <span>数据库未连接</span> {% endif %} </span> </li> </ul> </div> </div> </nav> <div> <div> <!-- 侧边栏 --> <div> <div> <ul> <li> <a href="{{ url_for('dashboard') }}"dashboard' %}active{% endif %}"> <i></i> 仪表盘 </a> </li> <li> <a href="{{ url_for('query') }}"query' %}active{% endif %}"> <i></i> 查询 </a> </li> <li> <a href="{{ url_for('visualize') }}"visualize' %}active{% endif %}"> <i></i> 可视化 </a> </li> <li> <a href="{{ url_for('manage') }}"manage' %}active{% endif %}"> <i></i> 管理 </a> </li> </ul> <hr> <h6> <span>数据库信息</span> </h6> <ul> <li> <a href="#"> <i></i> {{ neo4j_conn.uri }} </a> </li> <li> <a href="#"> <i></i> {{ neo4j_conn.user }} </a> </li> </ul> </div> </div> <!-- 主内容区域 --> <div> {% with messages = get_flashed_messages(with_categories=true) %} {% if messages %} {% for category, message in messages %} <div> {{ message }} </div> {% endfor %} {% endif %} {% endwith %} {% block content %}{% endblock %} </div> </div> </div> <!-- Bootstrap JS --> <script src="https://cdn.jsdelivr.net/npm/[email protected]/dist/js/bootstrap.bundle.min.js"></script> <!-- 全局JS --> <script> // 显示加载指示器 function showLoading() { const loadingEl = document.createElement('div'); loadingEl.id = 'loading'; loadingEl.innerHTML = ` <div> <div role="status"> <span>加载中...</span> </div> </div> `; document.body.appendChild(loadingEl); } // 隐藏加载指示器 function hideLoading() { const loadingEl = document.getElementById('loading'); if (loadingEl) { loadingEl.remove(); } } // 显示消息 function showMessage(message, type = 'info') { const alertDiv = document.createElement('div'); alertDiv.className = `alert alert-${type} alert-dismissible fade show`; alertDiv.innerHTML = ` ${message} <button type="button"></button> `; const container = document.querySelector('.container-fluid .col-md-9'); container.insertBefore(alertDiv, container.firstChild); setTimeout(() => { alertDiv.remove(); }, 5000); } // 处理表单提交 function handleFormSubmit(formId, successCallback) { const form = document.getElementById(formId); if (form) { form.addEventListener('submit', function(e) { e.preventDefault(); showLoading(); const formData = new FormData(this); const action = this.getAttribute('action'); const method = this.getAttribute('method') || 'POST'; fetch(action, { method: method, body: formData }) .then(response => response.json()) .then(data => { hideLoading(); if (data.success) { showMessage(data.message, 'success'); if (successCallback) successCallback(data); } else { showMessage(data.message, 'danger'); } }) .catch(error => { hideLoading(); showMessage('请求失败: ' + error.message, 'danger'); }); }); } } // 页面加载完成 document.addEventListener('DOMContentLoaded', function() { // 为所有带有 data-confirm 属性的链接添加确认对话框 document.querySelectorAll('a[data-confirm]').forEach(link => { link.addEventListener('click', function(e) { if (!confirm(this.getAttribute('data-confirm'))) { e.preventDefault(); } }); }); // 为所有表格行添加点击效果 document.querySelectorAll('table tbody tr').forEach(row => { row.addEventListener('click', function() { this.classList.toggle('table-active'); }); }); }); </script> {% block extra_js %}{% endblock %} </body> </html>
templates/dashboard.html
html
{% extends "base.html" %} {% block title %}仪表盘 - 知识图谱管理系统{% endblock %} {% block content %} <div> <h1> <i></i> 仪表盘 </h1> <div> <div> <button type="button" οnclick="refreshDashboard()"> <i></i> 刷新 </button> </div> </div> </div> <!-- 统计卡片 --> <div> <div> <div> <i></i> <div>0</div> <div>节点总数</div> </div> </div> <div> <div> <i></i> <div>0</div> <div>关系总数</div> </div> </div> <div> <div> <i></i> <div>0</div> <div>标签类型</div> </div> </div> <div> <div> <i></i> <div>0</div> <div>关系类型</div> </div> </div> </div> <div> <!-- 节点统计 --> <div> <div> <div> <i></i> 节点统计(按标签) </div> <div> <table> <thead> <tr> <th>标签</th> <th>数量</th> <th>百分比</th> </tr> </thead> <tbody> {% for stat in stats %} <tr> <td>{{ stat.label }}</td> <td>{{ stat.count }}</td> <td> <div> <div role="progressbar"count') or 1) }}%"> </div> </div> </td> </tr> {% endfor %} </tbody> </table> </div> </div> </div> <!-- 最近添加的节点 --> <div> <div> <div> <i></i> 最近添加的节点 </div> <div> <div> {% for node in recent_nodes %} <a href="#"> <div> <h6> <span>{{ node.label }}</span> {{ node.name }} </h6> <small>{{ node.created_at }}</small> </div> </a> {% endfor %} </div> </div> </div> </div> </div> <!-- 数据库信息 --> <div> <div> <i></i> 数据库信息 </div> <div> <div> {% for section, data in db_info.items() %} <div> <h2> <button type="button"> {{ section }} ({{ data|length }}) </button> </h2> <div> <div> {% if data is string %} <p>{{ data }}</p> {% elif data is mapping %} <pre>{{ data|tojson(indent=2) }}</pre> {% else %} <table> {% for item in data %} <tr> {% for key, value in item.items() %} <td><strong>{{ key }}:</strong></td> <td>{{ value }}</td> {% endfor %} </tr> {% endfor %} </table> {% endif %} </div> </div> </div> {% endfor %} </div> </div> </div> {% endblock %} {% block extra_js %} <script> // 刷新统计数据 function refreshDashboard() { showLoading(); fetch('/api/statistics') .then(response => response.json()) .then(data => { if (data.success) { updateStatistics(data.statistics); } hideLoading(); }) .catch(error => { console.error('Error:', error); hideLoading(); }); } // 更新统计数据显示 function updateStatistics(stats) { // 更新节点统计表 const nodeStatsBody = document.getElementById('node-stats'); if (nodeStatsBody && stats.nodes_by_label) { let totalNodes = 0; stats.nodes_by_label.forEach(item => { totalNodes += item.count; }); nodeStatsBody.innerHTML = stats.nodes_by_label.map(item => ` <tr> <td>${item.label}</td> <td>${item.count}</td> <td> <div> <div role="progressbar"> </div> </div> </td> </tr> `).join(''); // 更新统计卡片 document.getElementById('node-count').textContent = totalNodes; document.getElementById('label-count').textContent = stats.nodes_by_label.length; } // 更新关系统计 if (stats.relationships_by_type) { let totalRels = 0; stats.relationships_by_type.forEach(item => { totalRels += item.count; }); document.getElementById('rel-count').textContent = totalRels; document.getElementById('rel-type-count').textContent = stats.relationships_by_type.length; } } // 页面加载时获取统计数据 document.addEventListener('DOMContentLoaded', function() { refreshDashboard(); // 每30秒自动刷新 setInterval(refreshDashboard, 30000); }); </script> {% endblock %}
templates/query.html
html
{% extends "base.html" %} {% block title %}查询 - 知识图谱管理系统{% endblock %} {% block content %} <div> <h1> <i></i> Cypher 查询 </h1> <div> <div> <button type="button" οnclick="clearQuery()"> <i></i> 清空 </button> <button type="button" οnclick="executeQuery()"> <i></i> 执行 </button> </div> </div> </div> <div> <!-- 查询编辑器 --> <div> <div> <div> <i></i> 查询编辑器 </div> <div> <form method="POST"> <div> <textarea name="query" rows="8" placeholder="输入Cypher查询语句...">{{ query_text }}</textarea> </div> <div> <label>查询参数 (JSON格式):</label> <textarea name="params" rows="3" placeholder='{"name": "Alice"}'></textarea> </div> <button type="submit"> <i></i> 执行查询 </button> <button type="button" οnclick="explainQuery()"> <i></i> 解释执行计划 </button> </form> </div> </div> </div> <!-- 查询模板 --> <div> <div> <div> <i></i> 查询模板 </div> <div> <div> {% for template in query_templates %} <a href="#" οnclick="loadTemplate('{{ template.query }}')"> <div> <h6>{{ template.name }}</h6> </div> <small>{{ template.query|truncate(50) }}</small> </a> {% endfor %} </div> <div> <h6>常用操作:</h6> <div> <button οnclick="loadTemplate('MATCH (n) RETURN n LIMIT 10')"> <i></i> 查看所有节点 </button> <button οnclick="loadTemplate('MATCH ()-[r]->() RETURN r LIMIT 10')"> <i></i> 查看所有关系 </button> <button οnclick="loadTemplate('CALL db.labels()')"> <i></i> 查看所有标签 </button> <button οnclick="loadTemplate('CALL db.relationshipTypes()')"> <i></i> 查看所有关系类型 </button> </div> </div> </div> </div> </div> </div> <!-- 查询结果 --> {% if results %} <div> <div> <div> <i></i> 查询结果 <span>{{ results|length }} 条记录</span> </div> <div> <button οnclick="exportResults()"> <i></i> 导出为JSON </button> </div> </div> <div> <div> {% if results %} {% set headers = results[0].keys() %} <table> <thead> <tr> {% for header in headers %} <th>{{ header }}</th> {% endfor %} </tr> </thead> <tbody> {% for row in results %} <tr> {% for header in headers %} <td> {% if row[header] is mapping or row[header] is iterable and row[header] is not string %} <button οnclick="showDetail('{{ row[header]|tojson|escape }}')"> 查看详情 </button> {% else %} {{ row[header] }} {% endif %} </td> {% endfor %} </tr> {% endfor %} </tbody> </table> {% else %} <div> 没有查询到数据 </div> {% endif %} </div> </div> </div> {% endif %} <!-- 执行计划结果模态框 --> <div tabindex="-1"> <div> <div> <div> <h5>执行计划</h5> <button type="button"></button> </div> <div> <pre></pre> </div> </div> </div> </div> <!-- 详情模态框 --> <div tabindex="-1"> <div> <div> <div> <h5>详情</h5> <button type="button"></button> </div> <div> <pre></pre> </div> </div> </div> </div> {% endblock %} {% block extra_js %} <script> // 加载查询模板 function loadTemplate(query) { document.getElementById('query').value = query; } // 清空查询 function clearQuery() { document.getElementById('query').value = ''; document.getElementById('params').value = ''; } // 执行查询 function executeQuery() { document.getElementById('queryForm').submit(); } // 解释执行计划 function explainQuery() { const query = document.getElementById('query').value; if (!query.trim()) { alert('请输入查询语句'); return; } showLoading(); const explainQuery = 'EXPLAIN ' + query; const params = document.getElementById('params').value; let paramObj = {}; try { if (params.trim()) { paramObj = JSON.parse(params); } } catch (e) { alert('参数格式错误: ' + e.message); hideLoading(); return; } fetch('/query', { method: 'POST', headers: { 'Content-Type': 'application/json', }, body: JSON.stringify({ query: explainQuery, params: paramObj, explain: true }) }) .then(response => response.json()) .then(data => { hideLoading(); if (data.success) { const explainResult = document.getElementById('explainResult'); explainResult.textContent = JSON.stringify(data.result, null, 2); const modal = new bootstrap.Modal(document.getElementById('explainModal')); modal.show(); } else { alert('错误: ' + data.message); } }) .catch(error => { hideLoading(); alert('请求失败: ' + error.message); }); } // 显示详情 function showDetail(content) { const detailContent = document.getElementById('detailContent'); try { const parsed = JSON.parse(content); detailContent.textContent = JSON.stringify(parsed, null, 2); } catch (e) { detailContent.textContent = content; } const modal = new bootstrap.Modal(document.getElementById('detailModal')); modal.show(); } // 导出结果为JSON function exportResults() { const results = {{ results|tojson|safe }}; const dataStr = JSON.stringify(results, null, 2); const dataUri = 'data:application/json;charset=utf-8,'+ encodeURIComponent(dataStr); const exportFileDefaultName = `query_results_${new Date().toISOString().slice(0,10)}.json`; const linkElement = document.createElement('a'); linkElement.setAttribute('href', dataUri); linkElement.setAttribute('download', exportFileDefaultName); linkElement.click(); } // 添加语法高亮(简化版) document.addEventListener('DOMContentLoaded', function() { const textarea = document.getElementById('query'); if (textarea) { textarea.addEventListener('keydown', function(e) { if (e.key === 'Tab') { e.preventDefault(); const start = this.selectionStart; const end = this.selectionEnd; // 插入制表符 this.value = this.value.substring(0, start) + ' ' + this.value.substring(end); // 移动光标位置 this.selectionStart = this.selectionEnd = start + 2; } }); } // 自动调整textarea高度 function autoResize(textarea) { textarea.style.height = 'auto'; textarea.style.height = (textarea.scrollHeight) + 'px'; } if (textarea) { textarea.addEventListener('input', function() { autoResize(this); }); autoResize(textarea); // 初始调整 } }); </script> {% endblock %}
7. 可视化模块
utils/visualizer.py
python
import networkx as nx import matplotlib.pyplot as plt import plotly.graph_objects as go from pyvis.network import Network import json import os class KGVisualizer: """知识图谱可视化器""" def __init__(self, neo4j_connector): self.conn = neo4j_connector def create_networkx_graph(self, limit=100): """创建NetworkX图""" G = nx.Graph() query = f""" MATCH (n)-[r]->(m) RETURN id(n) as source_id, labels(n)[0] as source_label, n.name as source_name, type(r) as relation_type, id(m) as target_id, labels(m)[0] as target_label, m.name as target_name LIMIT {limit} """ results = self.conn.execute_query(query) for record in results: source_id = record['source_id'] target_id = record['target_id'] # 添加节点 if source_id not in G: G.add_node( source_id, label=record['source_label'], name=record['source_name'], size=10, color=self._get_color(record['source_label']) ) if target_id not in G: G.add_node( target_id, label=record['target_label'], name=record['target_name'], size=10, color=self._get_color(record['target_label']) ) # 添加边 G.add_edge( source_id, target_id, type=record['relation_type'], weight=1 ) return G def _get_color(self, label): """根据标签获取颜色""" color_map = { "Person": "#FF6B6B", "Company": "#4ECDC4", "Movie": "#45B7D1", "Product": "#96CEB4", "Disease": "#FFEAA7", "Symptom": "#DDA0DD", "Drug": "#98D8C8", "Location": "#F7DC6F" } return color_map.get(label, "#AAAAAA") def create_pyvis_network(self, output_file="graph.html", limit=200): """创建Pyvis交互式网络图""" net = Network(,,, font_color="white", directed=True ) query = f""" MATCH (n)-[r]->(m) RETURN id(n) as source_id, labels(n)[0] as source_label, n.name as source_name, type(r) as relation_type, properties(r) as rel_props, id(m) as target_id, labels(m)[0] as target_label, m.name as target_name LIMIT {limit} """ results = self.conn.execute_query(query) added_nodes = set() for record in results: source_id = record['source_id'] target_id = record['target_id'] # 添加源节点 if source_id not in added_nodes: net.add_node( source_id, label=record['source_name'] or f"{record['source_label']}_{source_id}", title=f""" 标签: {record['source_label']}<br> ID: {source_id}<br> {self._format_properties(record.get('source_props', {}))} """, color=self._get_color(record['source_label']), size=15 ) added_nodes.add(source_id) # 添加目标节点 if target_id not in added_nodes: net.add_node( target_id, label=record['target_name'] or f"{record['target_label']}_{target_id}", title=f""" 标签: {record['target_label']}<br> ID: {target_id}<br> {self._format_properties(record.get('target_props', {}))} """, color=self._get_color(record['target_label']), size=15 ) added_nodes.add(target_id) # 添加边 rel_title = f"类型: {record['relation_type']}<br>" if record.get('rel_props'): rel_title += self._format_properties(record['rel_props']) net.add_edge( source_id, target_id, title=rel_title, label=record['relation_type'], color="#888888", width=2 ) # 设置物理布局 net.set_options(""" var options = { "physics": { "barnesHut": { "gravitationalConstant": -80000, "centralGravity": 0.3, "springLength": 95, "springConstant": 0.04, "damping": 0.09, "avoidOverlap": 0 }, "minVelocity": 0.75, "solver": "barnesHut" } } """) # 保存为HTML文件 net.save_graph(output_file) return output_file def _format_properties(self, props): """格式化属性显示""" if not props: return "" lines = [] for key, value in props.items(): if isinstance(value, str): value = value[:50] + "..." if len(value) > 50 else value lines.append(f"{key}: {value}") return "<br>".join(lines) def create_plotly_visualization(self, limit=100): """创建Plotly可视化""" G = self.create_networkx_graph(limit) if not G.nodes(): return None # 获取节点位置(使用spring布局) pos = nx.spring_layout(G, k=1, iterations=50) # 创建边迹 edge_x = [] edge_y = [] edge_text = [] for edge in G.edges(data=True): x0, y0 = pos[edge[0]] x1, y1 = pos[edge[1]] edge_x.append(x0) edge_x.append(x1) edge_x.append(None) edge_y.append(y0) edge_y.append(y1) edge_y.append(None) edge_text.append(edge[2].get('type', '')) edge_trace = go.Scatter( x=edge_x, y=edge_y, line=dict(width=0.5, color='#888'), hoverinfo='none', mode='lines' ) # 创建节点迹 node_x = [] node_y = [] node_text = [] node_color = [] node_size = [] for node in G.nodes(data=True): x, y = pos[node[0]] node_x.append(x) node_y.append(y) node_data = node[1] label = node_data.get('label', 'Node') name = node_data.get('name', f'Node_{node[0]}') node_text.append(f"{label}: {name}") node_color.append(node_data.get('color', '#AAAAAA')) node_size.append(node_data.get('size', 10)) node_trace = go.Scatter( x=node_x, y=node_y, mode='markers', hoverinfo='text', marker=dict( size=node_size, color=node_color, line_width=2 ), text=node_text ) # 创建图 fig = go.Figure(data=[edge_trace, node_trace], layout=go.Layout( title='知识图谱可视化', titlefont_size=16, showlegend=False, hovermode='closest', margin=dict(b=20, l=5, r=5, t=40), xaxis=dict(showgrid=False, zeroline=False, showticklabels=False), yaxis=dict(showgrid=False, zeroline=False, showticklabels=False) )) return fig.to_html(full_html=False) def export_graph_json(self, filename="graph_data.json"): """导出图为JSON格式"""" MATCH (n)-[r]->(m) RETURN id(n) as source_id, labels(n) as source_labels, properties(n) as source_properties, type(r) as relationship_type, properties(r) as relationship_properties, id(m) as target_id, labels(m) as target_labels, properties(m) as target_properties LIMIT 500 """ results = self.conn.execute_query(query) # 构建图数据 nodes = {} edges = [] for record in results: # 处理源节点 source_id = record['source_id'] if source_id not in nodes: nodes[source_id] = { "id": source_id, "labels": record['source_labels'], "properties": record['source_properties'] } # 处理目标节点 target_id = record['target_id'] if target_id not in nodes: nodes[target_id] = { "id": target_id, "labels": record['target_labels'], "properties": record['target_properties'] } # 添加边 edges.append({ "source": source_id, "target": target_id, "type": record['relationship_type'], "properties": record['relationship_properties'] }) graph_data = { "nodes": list(nodes.values()), "edges": edges, "metadata": { "node_count": len(nodes), "edge_count": len(edges) } } # 保存到文件 with open(filename, 'w', encoding='utf-8') as f: json.dump(graph_data, f, ensure_ascii=False, indent=2) return filename
8. 运行应用
requirements.txt
text
flask==2.3.2 neo4j==5.14.0 py2neo==2021.2.3 pandas==2.0.3 python-dotenv==1.0.0 flask-cors==4.0.0 networkx==3.1 matplotlib==3.7.2 plotly==5.15.0 pyvis==0.3.2
运行步骤:
- 安装依赖:
bash
pip install -r requirements.txt
- 配置Neo4j:
- 安装并启动Neo4j Desktop
- 修改默认密码
- 更新
.env文件中的配置
- 启动应用:
bash
python app.py
- 访问应用:
- 打开浏览器访问
http://localhost:5000 - 查看仪表盘、执行查询、可视化图谱
- 打开浏览器访问
9. 功能特点
- 完整的CRUD操作:支持节点和关系的创建、读取、更新、删除
- 强大的查询功能:支持Cypher查询、查询模板、执行计划
- 交互式可视化:使用Pyvis和Plotly实现图谱可视化
- 数据导入导出:支持CSV/JSON格式的数据导入导出
- 用户友好的界面:Bootstrap响应式设计,支持移动设备
- 实时统计:实时显示数据库统计信息
- 健康检查:监控数据库连接状态
- 示例数据:内置电影和公司知识图谱示例