Ubuntu 系统下 Python 连接金仓 KingbaseES 数据库实现增删改查
本文介绍在 Ubuntu 环境下使用 Python 通过 ksycopg2 驱动连接金仓 KingbaseES 数据库的方法。内容包括驱动安装、环境变量配置、数据库连接建立、数据表创建以及增删改查(CRUD)功能的实现。文中提供了封装好的 KingbaseESManager 类示例,展示了如何复用数据库操作逻辑,并包含具体的代码验证步骤。

本文介绍在 Ubuntu 环境下使用 Python 通过 ksycopg2 驱动连接金仓 KingbaseES 数据库的方法。内容包括驱动安装、环境变量配置、数据库连接建立、数据表创建以及增删改查(CRUD)功能的实现。文中提供了封装好的 KingbaseESManager 类示例,展示了如何复用数据库操作逻辑,并包含具体的代码验证步骤。

KingbaseES 提供了专门的 Python 驱动包 ksycopg2,它是基于 Python DB API 2.0 规范实现的线程安全数据库适配器。
ksycopg2 是 Python 编程语言的 KingbaseES 数据库适配器。它的主要特点是 Python DB API 2.0 规范的完整实现和线程安全。
ksycopg2 主要在 C 程序中作为 libkci 包装器实现,因此既高效又安全。它拥有客户端和服务端游标,支持异步通信和通知、复制。
ksycopg2 驱动需要和 python 大版本一致,如 python3.8 的 ksycopg2 驱动支持 python3.8.x 的任意小版本。
首先需要下载并安装与你的 Python 版本和系统架构匹配的 ksycopg2 驱动。驱动可以从 KingbaseES 官方网站获取。
解压后可以看到 python2.7、python3.6、python3.7、python3.8、python3.9、python3.10、python3.11、python3.12 等版本。
import sys
print(sys.path)
运行后可查看 Python 的模块位置,例如 /usr/lib/python3/dist-packages。
开始上传到对应的 Python 模块目录。
此外,还需要将 KingbaseES 的 libkci 库文件路径添加到 LD_LIBRARY_PATH 环境变量中:
export LD_LIBRARY_PATH=/kingbase/data/KESRealPro/V009R002C012/Server/lib:$LD_LIBRARY_PATH
如果不清楚自己 KingbaseES 的 libkci 库文件路在哪里,可以用这个命令查看:
ps -ef | grep kingbase
import ksycopg2
print("ksycopg2 驱动安装成功")
使用 ksycopg2 连接 KingbaseES 数据库需要提供数据库名称、用户名、密码、主机地址和端口号等信息。
import ksycopg2
def create_connection():
try:
conn = ksycopg2.connect(
database="TEST",
user="SYSTEM",
password="qwe123!@#",
host="127.0.0.1",
port="54321"
)
print("数据库连接成功")
return conn
except Exception as e:
print(f"连接数据库失败:{e}")
return None
# 建立数据库连接
connection = create_connection()
创建一个 connect_database.py 把上面代码复制进去,然后执行:
python connect_database.py
在执行增删改查操作前,需要先创建一张测试表。
def create_table(conn):
try:
cursor = conn.cursor()
create_table_sql = """CREATE TABLE IF NOT EXISTS user_info (
id INTEGER PRIMARY KEY,
username VARCHAR(50) NOT NULL,
age INTEGER,
created_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)"""
cursor.execute(create_table_sql)
conn.commit()
cursor.close()
print("表创建成功")
except Exception as e:
print(f"创建表失败:{e}")
conn.rollback()
# 创建表
if connection:
create_table(connection)
同理,在 ubuntu 服务器上新建一个 create_table.py 文件,把上面代码丢进去执行:
python create_table.py
def insert_data_example():
db_manager = KingbaseESManager(
dbname="test",
user="SYSTEM",
password="qwe123!@#",
host="127.0.0.1",
port="54321"
)
if db_manager.connect():
# 插入单条数据
insert_sql = "INSERT INTO user_info (id, username, age) VALUES (%s, %s, %s)"
params = (1, '张三', 25)
db_manager.execute_update(insert_sql, params)
# 插入多条数据
users_to_insert = [
(2, '李四', 30),
(3, '王五', 28),
(4, '赵六', 35),
(5, '钱七', 22)
]
for user in users_to_insert:
db_manager.execute_update(insert_sql, user)
db_manager.disconnect()
insert_data_example()
调用 python 后登录数据库查看是否已经插入数据库,ubuntu 登录如下所示:
# 切换到 kingbase 用户
su - kingbase
# 连接数据库
ksql -U SYSTEM -d test -p 54321
# 如果上面执行不成功,可以指定目录来连接
/kingbase/data/KESRealPro/V009R002C012/Server/bin/ksql -U SYSTEM -d test -p 54321
执行查询语句,查询所有用户数据,验证插入结果:
-- 查询所有用户数据,验证插入结果
SELECT * FROM user_info ORDER BY id;
def query_data_example():
db_manager = KingbaseESManager(
dbname="test",
user="SYSTEM",
password="qwe123!@#",
host="127.0.0.1",
port="54321"
)
if db_manager.connect():
print("所有用户信息:")
select_all_sql = "SELECT * FROM user_info ORDER BY id"
all_users = db_manager.execute_query(select_all_sql)
for user in all_users:
print(user)
print("\n查询 ID 为 2 的用户:")
select_by_id_sql = "SELECT * FROM user_info WHERE id = %s"
user_by_id = db_manager.execute_query(select_by_id_sql, (2,))
print(user_by_id)
print("\n年龄在 25-30 岁之间的用户:")
select_by_age_sql = "SELECT * FROM user_info WHERE age BETWEEN %s AND %s ORDER BY age"
users_by_age = db_manager.execute_query(select_by_age_sql, (25, 30))
for user in users_by_age:
print(user)
db_manager.disconnect()
query_data_example()
def update_data_example():
db_manager = KingbaseESManager(
dbname="TEST",
user="SYSTEM",
password="your_password",
host="127.0.0.1",
port="54321"
)
if db_manager.connect():
print("更新张三的年龄为 26:")
update_sql = "UPDATE user_info SET age = %s WHERE id = %s"
db_manager.execute_update(update_sql, (26, 1))
print("\n更新后的用户信息:")
select_sql = "SELECT * FROM user_info WHERE id = %s"
updated_user = db_manager.execute_query(select_sql, (1,))
print(updated_user)
db_manager.disconnect()
update_data_example()
def delete_data_example():
db_manager = KingbaseESManager(
dbname="TEST",
user="SYSTEM",
password="your_password",
host="127.0.0.1",
port="54321"
)
if db_manager.connect():
print("删除 ID 为 5 的用户:")
delete_sql = "DELETE FROM user_info WHERE id = %s"
db_manager.execute_update(delete_sql, (5,))
print("\n删除后的所有用户:")
select_all_sql = "SELECT * FROM user_info ORDER BY id"
remaining_users = db_manager.execute_query(select_all_sql)
for user in remaining_users:
print(user)
db_manager.disconnect()
delete_data_example()
下面将实现完整的增删改查功能,并将这些操作封装在一个类中,方便复用。
import ksycopg2
from datetime import datetime
class KingbaseESManager:
def __init__(self, dbname, user, password, host, port):
self.conn = None
self.db_params = {
"database": dbname,
"user": user,
"password": password,
"host": host,
"port": port
}
def connect(self):
try:
self.conn = ksycopg2.connect(**self.db_params)
print("数据库连接成功")
return True
except Exception as e:
print(f"连接数据库失败:{e}")
return False
def disconnect(self):
if self.conn:
self.conn.close()
print("数据库连接已关闭")
def execute_query(self, sql, params=None):
try:
cursor = self.conn.cursor()
if params:
cursor.execute(sql, params)
else:
cursor.execute(sql)
results = cursor.fetchall()
cursor.close()
return results
except Exception as e:
print(f"查询执行失败:{e}")
return None
def execute_update(self, sql, params=None):
try:
cursor = self.conn.cursor()
if params:
cursor.execute(sql, params)
else:
cursor.execute(sql)
self.conn.commit()
affected_rows = cursor.rowcount
cursor.close()
print(f"操作成功,影响行数:{affected_rows}")
return affected_rows
except Exception as e:
self.conn.rollback()
print(f"操作执行失败:{e}")
return -1
def insert_user(self, id, username, age):
sql = "INSERT INTO user_info (id, username, age) VALUES (%s, %s, %s)"
params = (id, username, age)
return self.execute_update(sql, params)
def select_all_users(self):
sql = "SELECT * FROM user_info ORDER BY id"
return self.execute_query(sql)
def select_user_by_id(self, id):
sql = "SELECT * FROM user_info WHERE id = %s"
params = (id,)
return self.execute_query(sql, params)
def update_user_age(self, id, new_age):
sql = "UPDATE user_info SET age = %s WHERE id = %s"
params = (new_age, id)
return self.execute_update(sql, params)
def delete_user(self, id):
sql = "DELETE FROM user_info WHERE id = %s"
params = (id,)
return self.execute_update(sql, params)
def search_users_by_age_range(self, min_age, max_age):
sql = "SELECT * FROM user_info WHERE age BETWEEN %s AND %s ORDER BY age"
params = (min_age, max_age)
return self.execute_query(sql, params)
if __name__ == "__main__":
db_manager = KingbaseESManager(
dbname="TEST",
user="SYSTEM",
password="qwe123!@#",
host="127.0.0.1",
port="54321"
)
if db_manager.connect():
users_to_insert = [
(1, '张三', 25),
(2, '李四', 30),
(3, '王五', 28),
(4, '赵六', 35),
(5, '钱七', 22)
]
for user in users_to_insert:
db_manager.insert_user(*user)
print("所有用户信息:")
all_users = db_manager.select_all_users()
for user in all_users:
print(user)
print("\n查询 ID 为 2 的用户:")
user_by_id = db_manager.select_user_by_id(2)
print(user_by_id)
print("\n更新张三的年龄为 26:")
db_manager.update_user_age(1, 26)
print("\n年龄在 25-30 岁之间的用户:")
users_by_age = db_manager.search_users_by_age_range(25, 30)
for user in users_by_age:
print(user)
print("\n删除 ID 为 5 的用户:")
db_manager.delete_user(5)
print("\n删除后的所有用户:")
remaining_users = db_manager.select_all_users()
for user in remaining_users:
print(user)
db_manager.disconnect()
本文介绍了在 Ubuntu 系统中使用 Python 连接国产金仓数据库 KingbaseES 的方法。主要内容包括:1. 安装与 Python 版本匹配的 ksycopg2 驱动;2. 配置环境变量和连接参数;3. 实现数据库连接、建表及增删改查操作;4. 封装可复用的数据库操作类。通过具体代码示例演示了数据插入、查询、更新和删除等常见操作,并提供了验证方法。文章还分享了环境配置中的常见问题和解决方案,帮助开发者快速上手 KingbaseES 数据库操作。

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML 转 Markdown 互为补充。 在线工具,Markdown 转 HTML在线工具,online
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML 转 Markdown在线工具,online
通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online