def insert_data_example():
db_manager = KingbaseESManager(
dbname="test", user="SYSTEM", password="qwe123!@#",
host="127.0.0.1", port="54321"
)
if db_manager.connect():
insert_sql = "INSERT INTO user_info (id, username, age) VALUES (%s, %s, %s)"
params = (1, '张三', 25)
db_manager.execute_update(insert_sql, params)
users_to_insert = [
(2, '李四', 30), (3, '王五', 28),
(4, '赵六', 35), (5, '钱七', 22)
]
for user in users_to_insert:
db_manager.execute_update(insert_sql, user)
db_manager.disconnect()
insert_data_example()
def query_data_example():
db_manager = KingbaseESManager(
dbname="test", user="SYSTEM", password="qwe123!@#",
host="127.0.0.1", port="54321"
)
if db_manager.connect():
select_all_sql = "SELECT * FROM user_info ORDER BY id"
all_users = db_manager.execute_query(select_all_sql)
for user in all_users:
print(user)
select_by_id_sql = "SELECT * FROM user_info WHERE id = %s"
user_by_id = db_manager.execute_query(select_by_id_sql, (2,))
print(user_by_id)
select_by_age_sql = "SELECT * FROM user_info WHERE age BETWEEN %s AND %s ORDER BY age"
users_by_age = db_manager.execute_query(select_by_age_sql, (25, 30))
for user in users_by_age:
print(user)
db_manager.disconnect()
query_data_example()
def update_data_example():
db_manager = KingbaseESManager(
dbname="TEST", user="SYSTEM", password="qwe123!@#",
host="127.0.0.1", port="54321"
)
if db_manager.connect():
update_sql = "UPDATE user_info SET age = %s WHERE id = %s"
db_manager.execute_update(update_sql, (26, 1))
db_manager.disconnect()
def delete_data_example():
db_manager = KingbaseESManager(
dbname="TEST", user="SYSTEM", password="qwe123!@#",
host="127.0.0.1", port="54321"
)
if db_manager.connect():
delete_sql = "DELETE FROM user_info WHERE id = %s"
db_manager.execute_update(delete_sql, (5,))
db_manager.disconnect()
import ksycopg2
from datetime import datetime
class KingbaseESManager:
def __init__(self, dbname, user, password, host, port):
self.conn = None
self.db_params = {
"database": dbname,
"user": user,
"password": password,
"host": host,
"port": port
}
def connect(self):
try:
self.conn = ksycopg2.connect(**self.db_params)
print("数据库连接成功")
return True
except Exception as e:
print(f"连接数据库失败:{e}")
return False
def disconnect(self):
if self.conn:
self.conn.close()
print("数据库连接已关闭")
def execute_query(self, sql, params=None):
try:
cursor = self.conn.cursor()
if params:
cursor.execute(sql, params)
else:
cursor.execute(sql)
results = cursor.fetchall()
cursor.close()
return results
except Exception as e:
print(f"查询执行失败:{e}")
return None
def execute_update(self, sql, params=None):
try:
cursor = self.conn.cursor()
if params:
cursor.execute(sql, params)
else:
cursor.execute(sql)
self.conn.commit()
affected_rows = cursor.rowcount
cursor.close()
print(f"操作成功,影响行数:{affected_rows}")
return affected_rows
except Exception as e:
self.conn.rollback()
print(f"操作执行失败:{e}")
return -1
def insert_user(self, id, username, age):
sql = "INSERT INTO user_info (id, username, age) VALUES (%s, %s, %s)"
params = (id, username, age)
return self.execute_update(sql, params)
def select_all_users(self):
sql = "SELECT * FROM user_info ORDER BY id"
return self.execute_query(sql)
def select_user_by_id(self, id):
sql = "SELECT * FROM user_info WHERE id = %s"
params = (id,)
return self.execute_query(sql, params)
def update_user_age(self, id, new_age):
sql = "UPDATE user_info SET age = %s WHERE id = %s"
params = (new_age, id)
return self.execute_update(sql, params)
def delete_user(self, id):
sql = "DELETE FROM user_info WHERE id = %s"
params = (id,)
return self.execute_update(sql, params)
def search_users_by_age_range(self, min_age, max_age):
sql = "SELECT * FROM user_info WHERE age BETWEEN %s AND %s ORDER BY age"
params = (min_age, max_age)
return self.execute_query(sql, params)
if __name__ == "__main__":
db_manager = KingbaseESManager(
dbname="TEST", user="SYSTEM", password="qwe123!@#",
host="127.0.0.1", port="54321"
)
if db_manager.connect():
users_to_insert = [
(1, '张三', 25), (2, '李四', 30),
(3, '王五', 28), (4, '赵六', 35), (5, '钱七', 22)
]
for user in users_to_insert:
db_manager.insert_user(*user)
print("所有用户信息:")
all_users = db_manager.select_all_users()
for user in all_users:
print(user)
print("\n更新张三的年龄为 26:")
db_manager.update_user_age(1, 26)
print("\n年龄在 25-30 岁之间的用户:")
users_by_age = db_manager.search_users_by_age_range(25, 30)
for user in users_by_age:
print(user)
print("\n删除 ID 为 5 的用户:")
db_manager.delete_user(5)
print("\n删除后的所有用户:")
remaining_users = db_manager.select_all_users()
for user in remaining_users:
print(user)
db_manager.disconnect()