import ksycopg2
from datetime import datetime
class KingbaseESManager:
def __init__(self, dbname, user, password, host, port):
self.conn = None
self.db_params = {
"database": dbname,
"user": user,
"password": password,
"host": host,
"port": port
}
def connect(self):
"""连接数据库"""
try:
self.conn = ksycopg2.connect(**self.db_params)
print("数据库连接成功")
return True
except Exception as e:
print(f"连接数据库失败:{e}")
return False
def disconnect(self):
"""断开数据库连接"""
if self.conn:
self.conn.close()
print("数据库连接已关闭")
def execute_query(self, sql, params=None):
"""执行查询语句并返回结果"""
try:
cursor = self.conn.cursor()
if params:
cursor.execute(sql, params)
else:
cursor.execute(sql)
results = cursor.fetchall()
cursor.close()
return results
except Exception as e:
print(f"查询执行失败:{e}")
return None
def execute_update(self, sql, params=None):
"""执行更新操作(插入、更新、删除)"""
try:
cursor = self.conn.cursor()
if params:
cursor.execute(sql, params)
else:
cursor.execute(sql)
self.conn.commit()
affected_rows = cursor.rowcount
cursor.close()
print(f"操作成功,影响行数:{affected_rows}")
return affected_rows
except Exception as e:
self.conn.rollback()
print(f"操作执行失败:{e}")
return -1
def insert_user(self, id, username, age):
"""插入用户数据"""
sql = "INSERT INTO user_info (id, username, age) VALUES (%s, %s, %s)"
params = (id, username, age)
return self.execute_update(sql, params)
def select_all_users(self):
"""查询所有用户"""
sql = "SELECT * FROM user_info ORDER BY id"
return self.execute_query(sql)
def select_user_by_id(self, id):
"""根据 ID 查询用户"""
sql = "SELECT * FROM user_info WHERE id = %s"
params = (id,)
return self.execute_query(sql, params)
def update_user_age(self, id, new_age):
"""更新用户年龄"""
sql = "UPDATE user_info SET age = %s WHERE id = %s"
params = (new_age, id)
return self.execute_update(sql, params)
def delete_user(self, id):
"""删除用户"""
sql = "DELETE FROM user_info WHERE id = %s"
params = (id,)
return self.execute_update(sql, params)
def search_users_by_age_range(self, min_age, max_age):
"""根据年龄范围查询用户"""
sql = "SELECT * FROM user_info WHERE age BETWEEN %s AND %s ORDER BY age"
params = (min_age, max_age)
return self.execute_query(sql, params)
if __name__ == "__main__":
db_manager = KingbaseESManager(
dbname="TEST",
user="SYSTEM",
password="qwe123!@#",
host="127.0.0.1",
port=54321
)
if db_manager.connect():
users_to_insert = [
(1, '张三', 25),
(2, '李四', 30),
(3, '王五', 28),
(4, '赵六', 35),
(5, '钱七', 22)
]
for user in users_to_insert:
db_manager.insert_user(*user)
print("所有用户信息:")
all_users = db_manager.select_all_users()
for user in all_users:
print(user)
print("\n查询 ID 为 2 的用户:")
user_by_id = db_manager.select_user_by_id(2)
print(user_by_id)
print("\n更新张三的年龄为 26:")
db_manager.update_user_age(1, 26)
print("\n年龄在 25-30 岁之间的用户:")
users_by_age = db_manager.search_users_by_age_range(25, 30)
for user in users_by_age:
print(user)
print("\n删除 ID 为 5 的用户:")
db_manager.delete_user(5)
print("\n删除后的所有用户:")
remaining_users = db_manager.select_all_users()
for user in remaining_users:
print(user)
db_manager.disconnect()