""" Neo4j 实战:社交网络系统
功能:实现用户管理、关注关系、帖子发布、好友推荐等
"""
import os
import logging
from datetime import datetime
from typing import List, Dict, Any, Optional
from dotenv import load_dotenv
from neo4j import GraphDatabase, Record
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
load_dotenv()
class Neo4jSocialNetwork:
""" Neo4j 社交网络客户端
封装用户、帖子、关注等操作的 Cypher 查询
"""
def __init__(self, uri: str = None, user: str = None, password: str = None):
""" 初始化 Neo4j 连接
Args:
uri: Neo4j 连接 URI,默认从环境变量读取
user: 用户名
password: 密码
"""
self.uri = uri or os.getenv("NEO4J_URI", "bolt://localhost:7687")
self.user = user or os.getenv("NEO4J_USER", "neo4j")
self.password = password or os.getenv("NEO4J_PASSWORD", "password123")
self.driver = GraphDatabase.driver(self.uri, auth=(self.user, self.password))
self._verify_connectivity()
logger.info(f"已连接到 Neo4j: {self.uri}")
def _verify_connectivity(self):
"""验证数据库连接"""
try:
self.driver.verify_connectivity()
except Exception as e:
logger.error(f"连接失败:{e}")
raise
def close(self):
"""关闭驱动程序连接"""
self.driver.close()
logger.info("Neo4j 连接已关闭")
def _execute_read(self, query: str, parameters: Dict = None) -> List[Record]:
""" 执行只读查询的内部方法
Args:
query: Cypher 查询语句
parameters: 查询参数
Returns:
查询结果记录列表
"""
with self.driver.session() as session:
result = session.run(query, parameters or {})
return list(result)
def _execute_write(self, query: str, parameters: Dict = None) -> List[Record]:
""" 执行写入查询的内部方法
Args:
query: Cypher 查询语句
parameters: 查询参数
Returns:
查询结果记录列表
"""
with self.driver.session() as session:
result = session.run(query, parameters or {})
return list(result)
def create_user(self, user_id: str, name: str, email: str, age: int = None) -> Dict:
""" 创建新用户
Args:
user_id: 用户唯一标识
name: 用户名
email: 邮箱
age: 年龄
Returns:
创建的用户信息
"""
query = """ CREATE (u:User {
user_id: $user_id, name: $name, email: $email, age: $age,
created_at: datetime(), reputation: 0 })
RETURN u { .user_id, .name, .email, .age, .reputation,
created_at: toString(u.created_at) } as user """
params = {"user_id": user_id, "name": name, "email": email, "age": age}
try:
records = self._execute_write(query, params)
if records:
user = records[0].get("user")
logger.info(f"创建用户成功:{user}")
return user
except Exception as e:
logger.error(f"创建用户失败:{e}")
raise
def get_user(self, user_id: str) -> Optional[Dict]:
""" 根据 ID 查询用户
Args:
user_id: 用户 ID
Returns:
用户信息或 None
"""
query = """ MATCH (u:User {user_id: $user_id})
RETURN u { .user_id, .name, .email, .age, .reputation,
created_at: toString(u.created_at) } as user """
records = self._execute_read(query, {"user_id": user_id})
if records:
return records[0].get("user")
return None
def search_users(self, name_pattern: str = None, min_age: int = None) -> List[Dict]:
""" 搜索用户
Args:
name_pattern: 用户名模糊匹配
min_age: 最小年龄
Returns:
符合条件的用户列表
"""
query = """ MATCH (u:User) WHERE 1=1 """
params = {}
if name_pattern:
query += " AND u.name CONTAINS $name_pattern"
params["name_pattern"] = name_pattern
if min_age:
query += " AND u.age >= $min_age"
params["min_age"] = min_age
query += """ RETURN u { .user_id, .name, .email, .age, .reputation,
created_at: toString(u.created_at) } as user ORDER BY u.reputation DESC LIMIT 50 """
records = self._execute_read(query, params)
return [record.get("user") for record in records]
def follow_user(self, follower_id: str, followee_id: str) -> bool:
""" 关注用户
Args:
follower_id: 关注者 ID
followee_id: 被关注者 ID
Returns:
是否成功
"""
query = """ MATCH (follower:User {user_id: $follower_id})
MATCH (followee:User {user_id: $followee_id})
MERGE (follower)-[r:FOLLOWS]->(followee)
ON CREATE SET r.created_at = datetime()
RETURN r IS NOT NULL as followed """
params = {"follower_id": follower_id, "followee_id": followee_id}
records = self._execute_write(query, params)
if records:
return records[0].get("followed", False)
return False
def unfollow_user(self, follower_id: str, followee_id: str) -> bool:
""" 取消关注
Args:
follower_id: 关注者 ID
followee_id: 被关注者 ID
Returns:
是否成功
"""
query = """ MATCH (follower:User {user_id: $follower_id})-[r:FOLLOWS]->(followee:User {user_id: $followee_id})
DELETE r RETURN COUNT(r) > 0 as unfollowed """
records = self._execute_write(query, {"follower_id": follower_id, "followee_id": followee_id})
if records:
return records[0].get("unfollowed", False)
return False
def get_followers(self, user_id: str, limit: int = 20) -> List[Dict]:
""" 获取用户的粉丝列表
Args:
user_id: 用户 ID
limit: 返回数量限制
Returns:
粉丝列表
"""
query = """ MATCH (follower:User)-[:FOLLOWS]->(u:User {user_id: $user_id})
RETURN follower { .user_id, .name, .reputation } as follower
ORDER BY follower.reputation DESC LIMIT $limit """
records = self._execute_read(query, {"user_id": user_id, "limit": limit})
return [record.get("follower") for record in records]
def get_following(self, user_id: str, limit: int = 20) -> List[Dict]:
""" 获取用户的关注列表
Args:
user_id: 用户 ID
limit: 返回数量限制
Returns:
关注列表
"""
query = """ MATCH (u:User {user_id: $user_id})-[:FOLLOWS]->(followee:User)
RETURN followee { .user_id, .name, .reputation } as followee
ORDER BY followee.reputation DESC LIMIT $limit """
records = self._execute_read(query, {"user_id": user_id, "limit": limit})
return [record.get("followee") for record in records]
def recommend_friends(self, user_id: str, limit: int = 10) -> List[Dict]:
""" 基于共同关注推荐好友
算法:
- 找到用户关注的人(目标用户的关注列表)
- 找到这些人关注的其他用户(二度关系)
- 排除用户已经关注的人
- 按共同关注数量排序
Args:
user_id: 用户 ID
limit: 推荐数量
Returns:
推荐用户列表(含共同关注数)
"""
query = """ MATCH (target:User {user_id: $user_id})-[:FOLLOWS]->(followed:User)
MATCH (followed)-[:FOLLOWS]->(recommend:User)
WHERE NOT (target)-[:FOLLOWS]->(recommend) AND target <> recommend
WITH recommend, COUNT(DISTINCT followed) as common_followers
RETURN recommend { .user_id, .name, .reputation, common_followers: common_followers } as user
ORDER BY common_followers DESC, recommend.reputation DESC LIMIT $limit """
records = self._execute_read(query, {"user_id": user_id, "limit": limit})
return [record.get("user") for record in records]
def get_mutual_followers(self, user_a: str, user_b: str) -> List[Dict]:
""" 获取两个用户的共同粉丝
Args:
user_a: 用户 A
user_b: 用户 B
Returns:
共同粉丝列表
"""
query = """ MATCH (follower:User)-[:FOLLOWS]->(a:User {user_id: $user_a})
WHERE (follower)-[:FOLLOWS]->(:User {user_id: $user_b})
RETURN follower { .user_id, .name, .reputation } as follower
ORDER BY follower.reputation DESC """
records = self._execute_read(query, {"user_a": user_a, "user_b": user_b})
return [record.get("follower") for record in records]
def create_post(self, post_id: str, author_id: str, content: str, tags: List[str] = None) -> Dict:
""" 创建帖子
Args:
post_id: 帖子 ID
author_id: 作者 ID
content: 内容
tags: 标签列表
Returns:
创建的帖子信息
"""
create_query = """ MATCH (author:User {user_id: $author_id})
CREATE (p:Post { post_id: $post_id, content: $content, created_at: datetime(), likes_count: 0, comments_count: 0 })
CREATE (author)-[:POSTED]->(p)
RETURN p { .post_id, .content, .likes_count, .comments_count, created_at: toString(p.created_at),
author_id: author.user_id, author_name: author.name } as post """
params = {"post_id": post_id, "author_id": author_id, "content": content}
with self.driver.session() as session:
result = session.run(create_query, params)
post_record = result.single()
if not post_record:
raise Exception("帖子创建失败")
post = post_record.get("post")
if tags:
for tag in tags:
tag_query = """ MATCH (p:Post {post_id: $post_id})
MERGE (t:Tag {name: $tag_name})
CREATE (p)-[:HAS_TAG]->(t) """
session.run(tag_query, {"post_id": post_id, "tag_name": tag})
final_query = """ MATCH (p:Post {post_id: $post_id})
OPTIONAL MATCH (p)-[:HAS_TAG]->(t:Tag)
WITH p, COLLECT(t.name) as tags
MATCH (author)-[:POSTED]->(p)
RETURN p { .post_id, .content, .likes_count, .comments_count, created_at: toString(p.created_at),
author_id: author.user_id, author_name: author.name, tags: tags } as post """
result = session.run(final_query, {"post_id": post_id})
post_record = result.single()
post = post_record.get("post")
logger.info(f"创建帖子成功:{post.get('post_id')}")
return post
def get_timeline(self, user_id: str, limit: int = 30) -> List[Dict]:
""" 获取用户的时间线(关注的用户的帖子)
Args:
user_id: 用户 ID
limit: 返回数量
Returns:
帖子列表,按时间倒序
"""
query = """ MATCH (user:User {user_id: $user_id})-[:FOLLOWS]->(followed:User)
MATCH (followed)-[:POSTED]->(p:Post)
OPTIONAL MATCH (p)-[:HAS_TAG]->(t:Tag)
WITH p, followed, COLLECT(t.name) as tags
RETURN p { .post_id, .content, .likes_count, .comments_count, created_at: toString(p.created_at),
author_id: followed.user_id, author_name: followed.name, tags: tags } as post
ORDER BY p.created_at DESC LIMIT $limit """
records = self._execute_read(query, {"user_id": user_id, "limit": limit})
return [record.get("post") for record in records]
def like_post(self, user_id: str, post_id: str) -> bool:
""" 点赞帖子
Args:
user_id: 用户 ID
post_id: 帖子 ID
Returns:
是否成功
"""
query = """ MATCH (user:User {user_id: $user_id})
MATCH (p:Post {post_id: $post_id})
MERGE (user)-[l:LIKES]->(p)
ON CREATE SET p.likes_count = p.likes_count + 1, l.created_at = datetime()
RETURN l IS NOT NULL as liked """
records = self._execute_write(query, {"user_id": user_id, "post_id": post_id})
if records:
return records[0].get("liked", False)
return False
def find_shortest_path(self, from_user: str, to_user: str, max_depth: int = 6) -> Optional[List[Dict]]:
""" 寻找两个用户之间的最短关注路径
Args:
from_user: 起始用户
to_user: 目标用户
max_depth: 最大搜索深度
Returns:
路径节点列表(按顺序)
"""
query = """ MATCH path = shortestPath( (from:User {user_id: $from_user})-[:FOLLOWS*..$max_depth]-(to:User {user_id: $to_user}) )
WITH [node IN nodes(path) | node {.user_id, .name} ] as node_path
RETURN node_path """
records = self._execute_read(query, {"from_user": from_user, "to_user": to_user, "max_depth": max_depth})
if records and records[0].get("node_path"):
return records[0].get("node_path")
return None
def get_friend_degrees(self, user_id: str, max_depth: int = 3) -> Dict[int, int]:
""" 统计用户各度关系的数量
Args:
user_id: 用户 ID
max_depth: 最大深度
Returns:
度数字典 {度数:人数}
"""
query = """ MATCH path = (user:User {user_id: $user_id})-[:FOLLOWS*1..$max_depth]-(other:User)
WHERE user <> other AND NOT (user)-[:FOLLOWS*]-(other) // 排除更短路径的重复计数
WITH other, MIN(length(path)) as degree
RETURN degree, COUNT(DISTINCT other) as count ORDER BY degree """
records = self._execute_read(query, {"user_id": user_id, "max_depth": max_depth})
result = {}
for record in records:
degree = record.get("degree")
count = record.get("count")
result[degree] = count
return result
def demo_social_network():
""" 演示社交网络功能 """
logger.info("=" * 60)
logger.info("Neo4j 社交网络演示")
logger.info("=" * 60)
neo = Neo4jSocialNetwork()
try:
logger.info("\n1. 创建测试用户...")
users = [("alice", "Alice Chen", "[email protected]", 28), ("bob", "Bob Wang", "[email protected]", 32),
("charlie", "Charlie Liu", "[email protected]", 25), ("diana", "Diana Zhang", "[email protected]", 30),
("eve", "Eve Li", "[email protected]", 27)]
created_users = []
for user_id, name, email, age in users:
user = neo.create_user(user_id, name, email, age)
created_users.append(user)
print(f" 创建用户:{user['name']} ({user['user_id']})")
logger.info("\n2. 建立关注关系...")
neo.follow_user("alice", "bob")
neo.follow_user("alice", "charlie")
print(" Alice 关注了 Bob 和 Charlie")
neo.follow_user("bob", "charlie")
neo.follow_user("bob", "diana")
print(" Bob 关注了 Charlie 和 Diana")
neo.follow_user("charlie", "diana")
print(" Charlie 关注了 Diana")
neo.follow_user("diana", "alice")
print(" Diana 关注了 Alice")
neo.follow_user("eve", "alice")
neo.follow_user("eve", "bob")
neo.follow_user("eve", "charlie")
neo.follow_user("eve", "diana")
print(" Eve 关注了所有人")
logger.info("\n3. 创建测试帖子...")
post1 = neo.create_post("post1", "alice", "图数据库真的很强大!今天学习了 Neo4j。", tags=["neo4j", "database", "learning"])
print(f" Alice 发布:{post1['content'][:20]}... 标签:{post1.get('tags', [])}")
post2 = neo.create_post("post2", "bob", "使用 Cypher 查询语言比 SQL 更直观,特别是处理关系数据。", tags=["cypher", "graph"])
print(f" Bob 发布:{post2['content'][:20]}... 标签:{post2.get('tags', [])}")
post3 = neo.create_post("post3", "charlie", "推荐大家看《图数据库实战》这本书,收获很大!", tags=["book", "recommendation"])
print(f" Charlie 发布:{post3['content'][:20]}... 标签:{post3.get('tags', [])}")
logger.info("\n4. 点赞互动...")
neo.like_post("alice", "post2")
neo.like_post("charlie", "post2")
neo.like_post("diana", "post2")
neo.like_post("alice", "post3")
neo.like_post("bob", "post3")
print(" Bob 的帖子获得了 3 个点赞")
print(" Charlie 的帖子获得了 2 个点赞")
logger.info("\n5. 为 Alice 推荐好友...")
recommendations = neo.recommend_friends("alice")
print(" 基于共同关注的推荐结果:")
for i, rec in enumerate(recommendations, 1):
print(f" {i}. {rec['name']} (共同关注:{rec['common_followers']})")
logger.info("\n6. Alice 的时间线...")
timeline = neo.get_timeline("alice")
for i, post in enumerate(timeline, 1):
print(f" {i}. {post['author_name']}: {post['content'][:30]}...")
logger.info("\n7. Alice 到 Diana 的关注路径...")
path = neo.find_shortest_path("alice", "diana")
if path:
path_str = " -> ".join([node['name'] for node in path])
print(f" 最短路径:{path_str}")
logger.info("\n8. Eve 的关系度数统计...")
degrees = neo.get_friend_degrees("eve", max_depth=3)
for degree, count in degrees.items():
print(f" {degree}度关系:{count}人")
finally:
neo.close()
logger.info("\n" + "=" * 60)
logger.info("演示完成")
logger.info("=" * 60)
if __name__ == "__main__":
demo_social_network()