Ubuntu 下 Python 连接金仓 KingbaseES 数据库实现增删改查
在 Linux 环境下使用 Python 对接国产数据库,驱动配置往往是第一步。这里把连接金仓 KingbaseES 的经验整理一下,希望能帮到同样踩坑的兄弟。
1. 环境准备与驱动安装
KingbaseES 提供了专门的 Python 驱动包 ksycopg2,它是基于 Python DB API 2.0 规范实现的线程安全数据库适配器。该驱动主要在 C 程序中作为 libkci 包装器实现,因此既高效又安全,支持客户端和服务端游标、异步通信和通知等功能。
注意: ksycopg2 驱动需要和 Python 大版本一致,例如 python3.8 的 ksycopg2 驱动支持 python3.8.x 的任意小版本。
1.1 下载驱动
从 KingbaseES 官方网站获取与你的 Python 版本和系统架构匹配的驱动包。解压后通常能看到针对不同 Python 版本的目录(如 python3.6, python3.7, python3.8 等)。
1.2 安装驱动
-
确认 Python 模块路径 如果不确定 Python 的模块搜索路径在哪里,可以运行以下代码查看:
import sys print(sys.path)常见路径如
/usr/lib/python3/dist-packages。 -
复制驱动文件 将解压后的
ksycopg2文件夹上传至上述 Python 模块路径中。 -
配置环境变量 需要将 KingbaseES 的
libkci库文件路径添加到LD_LIBRARY_PATH环境变量中。如果不清楚具体路径,可以通过命令查看:ps -ef | grep kingbase假设路径为
/kingbase/data/KESRealPro/V009R002C012/Server/lib,则执行:export LD_LIBRARY_PATH=/kingbase/data/KESRealPro/V009R002C012/Server/lib:$LD_LIBRARY_PATH建议将此配置写入
.bashrc或.profile以便永久生效。 -
验证安装 运行以下代码检查驱动是否加载成功:
import ksycopg2 print("ksycopg2 驱动安装成功")
2. 连接数据库
使用 ksycopg2 连接 KingbaseES 需要提供数据库名称、用户名、密码、主机地址和端口号等信息。
import ksycopg2
def create_connection():
try:
conn = ksycopg2.connect(
database="TEST",
user="SYSTEM",
password="qwe123!@#",
host="127.0.0.1",
port=54321
)
print("数据库连接成功")
return conn
except Exception as e:
print(f"连接数据库失败:{e}")
return None
# 建立数据库连接
connection = create_connection()
3. 创建数据表
在执行增删改查操作前,需要先创建一张测试表。
def create_table(conn):
try:
cursor = conn.cursor()
create_table_sql = """
CREATE TABLE IF NOT EXISTS user_info (
id INTEGER PRIMARY KEY,
username VARCHAR(50) NOT NULL,
age INTEGER,
created_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""
cursor.execute(create_table_sql)
conn.commit()
cursor.close()
print("表创建成功")
except Exception as e:
print(f"创建表失败:{e}")
conn.rollback()
if connection:
create_table(connection)
4. 实现增删改查功能
为了代码复用性,我们将封装一个管理类来处理数据库操作。
4.1 封装数据库管理类
下面是一个完整的 KingbaseESManager 类实现,包含了连接管理、查询、更新及具体的业务方法。
import ksycopg2
from datetime import datetime
class KingbaseESManager:
def __init__(self, dbname, user, password, host, port):
self.conn = None
self.db_params = {
"database": dbname,
"user": user,
"password": password,
"host": host,
"port": port
}
def connect(self):
"""连接数据库"""
try:
self.conn = ksycopg2.connect(**self.db_params)
print("数据库连接成功")
return True
except Exception as e:
print(f"连接数据库失败:{e}")
return False
def disconnect(self):
"""断开数据库连接"""
if self.conn:
self.conn.close()
print("数据库连接已关闭")
def execute_query(self, sql, params=None):
"""执行查询语句并返回结果"""
try:
cursor = self.conn.cursor()
if params:
cursor.execute(sql, params)
else:
cursor.execute(sql)
results = cursor.fetchall()
cursor.close()
return results
except Exception as e:
print(f"查询执行失败:{e}")
return None
def execute_update(self, sql, params=None):
"""执行更新操作(插入、更新、删除)"""
try:
cursor = self.conn.cursor()
if params:
cursor.execute(sql, params)
else:
cursor.execute(sql)
self.conn.commit()
affected_rows = cursor.rowcount
cursor.close()
print(f"操作成功,影响行数:{affected_rows}")
return affected_rows
except Exception as e:
self.conn.rollback()
print(f"操作执行失败:{e}")
return -1
def insert_user(self, id, username, age):
"""插入用户数据"""
sql = "INSERT INTO user_info (id, username, age) VALUES (%s, %s, %s)"
params = (id, username, age)
return self.execute_update(sql, params)
def select_all_users(self):
"""查询所有用户"""
sql = "SELECT * FROM user_info ORDER BY id"
return self.execute_query(sql)
def select_user_by_id(self, id):
"""根据 ID 查询用户"""
sql = "SELECT * FROM user_info WHERE id = %s"
params = (id,)
return self.execute_query(sql, params)
def update_user_age(self, id, new_age):
"""更新用户年龄"""
sql = "UPDATE user_info SET age = %s WHERE id = %s"
params = (new_age, id)
return self.execute_update(sql, params)
def delete_user(self, id):
"""删除用户"""
sql = "DELETE FROM user_info WHERE id = %s"
params = (id,)
return self.execute_update(sql, params)
def search_users_by_age_range(self, min_age, max_age):
"""根据年龄范围查询用户"""
sql = "SELECT * FROM user_info WHERE age BETWEEN %s AND %s ORDER BY age"
params = (min_age, max_age)
return self.execute_query(sql, params)
# 使用示例
if __name__ == "__main__":
# 创建数据库管理实例
db_manager = KingbaseESManager(
dbname="TEST",
user="SYSTEM",
password="qwe123!@#",
host="127.0.0.1",
port=54321
)
# 连接数据库
if db_manager.connect():
# 插入多条用户数据
users_to_insert = [
(1, '张三', 25),
(2, '李四', 30),
(3, '王五', 28),
(4, '赵六', 35),
(5, '钱七', 22)
]
for user in users_to_insert:
db_manager.insert_user(*user)
# 查询所有用户
print("所有用户信息:")
all_users = db_manager.select_all_users()
for user in all_users:
print(user)
# 根据 ID 查询用户
print("\n查询 ID 为 2 的用户:")
user_by_id = db_manager.select_user_by_id(2)
print(user_by_id)
# 更新用户年龄
print("\n更新张三的年龄为 26:")
db_manager.update_user_age(1, 26)
# 查询年龄在 25-30 岁之间的用户
print("\n年龄在 25-30 岁之间的用户:")
users_by_age = db_manager.search_users_by_age_range(25, 30)
for user in users_by_age:
print(user)
# 删除 ID 为 5 的用户
print("\n删除 ID 为 5 的用户:")
db_manager.delete_user(5)
# 再次查询所有用户
print("\n删除后的所有用户:")
remaining_users = db_manager.select_all_users()
for user in remaining_users:
print(user)
# 断开连接
db_manager.disconnect()
4.2 验证操作
执行脚本后,可以通过命令行登录数据库验证数据状态。
# 切换到 kingbase 用户
su - kingbase
# 连接数据库
ksql -U SYSTEM -d test -p 54321
# 如果上面执行不成功,可以指定目录来连接
/kingbase/data/KESRealPro/V009R002C012/Server/bin/ksql -U SYSTEM -d test -p 54321
在数据库中执行查询语句验证:
-- 查询所有用户数据
SELECT * FROM user_info ORDER BY id;
5. 总结
本文演示了在 Ubuntu 操作系统中利用 Python 语言通过 ksycopg2 驱动连接金仓 KingbaseES 数据库的全过程。内容涵盖驱动安装、环境变量配置、基础连接建立以及完整的增删改查(CRUD)功能实现。通过封装 KingbaseESManager 类,提供了可复用的数据库操作接口,包含单条与批量插入、条件查询、数据更新及删除等核心功能,并附带了验证步骤与常见问题排查建议,帮助开发者快速完成国产化数据库的集成开发。


