# 切换到 kingbase 用户
su - kingbase
# 连接数据库
ksql -U SYSTEM -d test -p 54321
# 如果上面执行不成功,可以指定目录来连接
/kingbase/data/KESRealPro/V009R002C012/Server/bin/ksql -U SYSTEM -d test -p 54321
defquery_data_example():
db_manager = KingbaseESManager(
dbname="test",
user="SYSTEM",
password="qwe123!@#",
host="127.0.0.1",
port="54321"
)
if db_manager.connect():
print("所有用户信息:")
select_all_sql = "SELECT * FROM user_info ORDER BY id"
all_users = db_manager.execute_query(select_all_sql)
for user in all_users:
print(user)
print("\n查询 ID 为 2 的用户:")
select_by_id_sql = "SELECT * FROM user_info WHERE id = %s"
user_by_id = db_manager.execute_query(select_by_id_sql, (2,))
print(user_by_id)
print("\n年龄在 25-30 岁之间的用户:")
select_by_age_sql = "SELECT * FROM user_info WHERE age BETWEEN %s AND %s ORDER BY age"
users_by_age = db_manager.execute_query(select_by_age_sql, (25, 30))
for user in users_by_age:
print(user)
db_manager.disconnect()
query_data_example()
4.3 修改
defupdate_data_example():
db_manager = KingbaseESManager(
dbname="TEST",
user="SYSTEM",
password="your_password",
host="127.0.0.1",
port="54321"
)
if db_manager.connect():
print("更新张三的年龄为 26:")
update_sql = "UPDATE user_info SET age = %s WHERE id = %s"
db_manager.execute_update(update_sql, (26, 1))
print("\n更新后的用户信息:")
select_sql = "SELECT * FROM user_info WHERE id = %s"
updated_user = db_manager.execute_query(select_sql, (1,))
print(updated_user)
db_manager.disconnect()
update_data_example()
4.4 删除
defdelete_data_example():
db_manager = KingbaseESManager(
dbname="TEST",
user="SYSTEM",
password="your_password",
host="127.0.0.1",
port="54321"
)
if db_manager.connect():
print("删除 ID 为 5 的用户:")
delete_sql = "DELETE FROM user_info WHERE id = %s"
db_manager.execute_update(delete_sql, (5,))
print("\n删除后的所有用户:")
select_all_sql = "SELECT * FROM user_info ORDER BY id"
remaining_users = db_manager.execute_query(select_all_sql)
for user in remaining_users:
print(user)
db_manager.disconnect()
delete_data_example()
4.5 封装一个类 crud 方便复用
下面将实现完整的增删改查功能,并将这些操作封装在一个类中,方便复用。
import ksycopg2
from datetime import datetime
classKingbaseESManager:
def__init__(self, dbname, user, password, host, port):
self.conn = Noneself.db_params = {
"database": dbname,
"user": user,
"password": password,
"host": host,
"port": port
}
defconnect(self):
try:
self.conn = ksycopg2.connect(**self.db_params)
print("数据库连接成功")
returnTrueexcept Exception as e:
print(f"连接数据库失败:{e}")
returnFalsedefdisconnect(self):
ifself.conn:
self.conn.close()
print("数据库连接已关闭")
defexecute_query(self, sql, params=None):
try:
cursor = self.conn.cursor()
if params:
cursor.execute(sql, params)
else:
cursor.execute(sql)
results = cursor.fetchall()
cursor.close()
return results
except Exception as e:
print(f"查询执行失败:{e}")
returnNonedefexecute_update(self, sql, params=None):
try:
cursor = self.conn.cursor()
if params:
cursor.execute(sql, params)
else:
cursor.execute(sql)
self.conn.commit()
affected_rows = cursor.rowcount
cursor.close()
print(f"操作成功,影响行数:{affected_rows}")
return affected_rows
except Exception as e:
self.conn.rollback()
print(f"操作执行失败:{e}")
return -1definsert_user(self, id, username, age):
sql = "INSERT INTO user_info (id, username, age) VALUES (%s, %s, %s)"
params = (id, username, age)
returnself.execute_update(sql, params)
defselect_all_users(self):
sql = "SELECT * FROM user_info ORDER BY id"returnself.execute_query(sql)
defselect_user_by_id(self, id):
sql = "SELECT * FROM user_info WHERE id = %s"
params = (id,)
returnself.execute_query(sql, params)
defupdate_user_age(self, id, new_age):
sql = "UPDATE user_info SET age = %s WHERE id = %s"
params = (new_age, id)
returnself.execute_update(sql, params)
defdelete_user(self, id):
sql = "DELETE FROM user_info WHERE id = %s"
params = (id,)
returnself.execute_update(sql, params)
defsearch_users_by_age_range(self, min_age, max_age):
sql = "SELECT * FROM user_info WHERE age BETWEEN %s AND %s ORDER BY age"
params = (min_age, max_age)
returnself.execute_query(sql, params)
if __name__ == "__main__":
db_manager = KingbaseESManager(
dbname="TEST",
user="SYSTEM",
password="qwe123!@#",
host="127.0.0.1",
port="54321"
)
if db_manager.connect():
users_to_insert = [
(1, '张三', 25),
(2, '李四', 30),
(3, '王五', 28),
(4, '赵六', 35),
(5, '钱七', 22)
]
for user in users_to_insert:
db_manager.insert_user(*user)
print("所有用户信息:")
all_users = db_manager.select_all_users()
for user in all_users:
print(user)
print("\n查询 ID 为 2 的用户:")
user_by_id = db_manager.select_user_by_id(2)
print(user_by_id)
print("\n更新张三的年龄为 26:")
db_manager.update_user_age(1, 26)
print("\n年龄在 25-30 岁之间的用户:")
users_by_age = db_manager.search_users_by_age_range(25, 30)
for user in users_by_age:
print(user)
print("\n删除 ID 为 5 的用户:")
db_manager.delete_user(5)
print("\n删除后的所有用户:")
remaining_users = db_manager.select_all_users()
for user in remaining_users:
print(user)
db_manager.disconnect()