跳到主要内容
基于 KaiwuDB 与 CodeArts 智能体的智能家居本地化数据处理系统构建 | 极客日志
Python AI
基于 KaiwuDB 与 CodeArts 智能体的智能家居本地化数据处理系统构建 针对智能家居云端数据处理的网络依赖、延迟高及隐私泄露问题,提出基于 KaiwuDB 多模时序数据库与华为 CodeArts 代码智能体的本地化解决方案。通过 VSCode 远程连接服务器,利用 CodeArts 智能体实现 KaiwuDB 的 Docker 自动化部署及系统全模块开发。系统涵盖设备接入、数据处理、规则引擎及本地存储,支持断网运行与低延迟查询。最终构建零云端依赖、高隐私保护的智能家居本地化数据处理系统,降低技术门槛并适配家庭场景需求。
魔法巫师 发布于 2026/4/6 更新于 2026/5/20 26 浏览针对智能家居云端数据处理模式的网络依赖、低延迟性差、隐私泄露三大痛点,本文介绍基于 KaiwuDB(KWDB)多模时序数据库 + 华为 CodeArts 代码智能体的本地化数据处理解决方案。从环境搭建、KWDB 自动化部署,到系统全模块开发、接口测试实现全流程落地,打造零云端依赖、低延迟、高隐私的智能家居本地化数据处理系统。
随着智能家居设备渗透率持续提升,家庭中温湿度传感器、智能灯、空调、门锁等设备呈规模化增长,设备运行产生的时序数据 (温湿度、能耗、设备状态)与关系型数据 (设备信息、规则配置)呈爆发式增长,对数据的存储、处理与利用提出更高要求。
本文选择KaiwuDB 作为本地化数据存储与计算核心,华为 CodeArts 代码智能体 作为自动化研发引擎,二者结合实现智能家居本地化数据处理系统的高效构建。
KaiwuDB:适配 AIoT 场景的多模数据库基座
KaiwuDB(开源版本简称 KWDB)是开放原子开源基金会孵化的分布式多模数据库,专为 AIoT 场景设计,核心特性完美匹配智能家居本地化需求:
多模融合 :支持时序库与关系库统一实例管理,无需多库部署,一站式处理智能家居时序数据与关系型数据,简化架构与运维成本;
极致性能 :就地计算、智能预计算技术实现百万级数据秒级写入、亿级数据秒级读取,单机支持 1 秒 20 亿记录数据探索,满足设备高频采集与低延迟查询需求;
本地高可靠 :支持离线运行、断网可用,数据全程本地留存,从根源保障隐私;特有压缩算法节省 90% 时序数据存储空间,降低本地硬件成本;
高兼容性 :支持标准 SQL 语法与主流数据库工具,协议插件框架可快速适配 MQTT、Modbus 等智能家居设备协议,扩展性强。
华为 CodeArts 代码智能体:自动化研发效率引擎
华为云 CodeArts 代码智能体融合 AI 大模型与 IDE 工具,实现自然语言到代码的全流程转化,核心能力如下:
自然语言转开发 :将业务需求直接转化为结构化开发步骤,自动完成代码生成、配置编写、接口开发,非专业开发者也可参与系统构建;
环境自动适配 :自主调用 Shell 命令、代码搜索等工具,完成 KaiwuDB 部署、表结构设计、硬件环境适配,大幅减少手动操作;
代码质量保障 :内置代码规范检查、漏洞扫描、单元测试生成,确保代码符合工业级标准,支持多人协同与快速迭代;
全流程自动化 :从数据库部署到系统开发、测试,全程通过自然语言指令驱动,大幅缩短研发周期。
一、环境准备
1.1 配置 VSCode 远程链接服务器
所有操作在 VSCode 下完成。
1.1.1 首先在 VSCode 下载安装 remote-ssh 扩展,打开 VSCode,在扩展里搜索 remote-ssh,然后点击安装。
1.1.2 链接远程服务器
点击 remote-ssh 扩展,点击加号,然后按着提示输入:ssh [email protected] ,这里服务器替换成自己的 ip 与用户,然后回车确认。
选择 ssh 信息存放的配置文件。
因为我们是 linux 的服务器,这里选择 linux。
1.2 准备 CodeArts 代码智能体 CodeArts 代码智能体是基于智能生成、智能问答两大核心能力构建起一套全方位、多层次的智能开发体系。
在 VSCode 下载安装 CodeArts 代码智能体,在扩展里搜索 CodeArts,然后点击安装。
安装完成后点击 CodeArts 图标,然后点击登录。
在 CodeArts 代码智能体对话框下侧点 Agent 模式,在弹出的菜单最后侧点设置按钮,进入授权所有自动化操作界面,分别勾选读取文件和目录 、编辑文件 、执行命令 、更新代办、执行 task 工具、使用浏览器 。
至此 CodeArts 代码智能体已经准备完成了,下面全部交给 ai 帮我们部署数据库,开发部署应用。
二、数据库自动部署 借助 CodeArts 代码智能体的自动化能力,通过自然语言指令即可完成 KaiwuDB 的 Docker 化部署,全程无需手动编写部署脚本。
在 CodeArts 代码智能体里输入提示词:帮我使用 docker 部署一个最新版的 kwdb,然后等待数据库部署。
等待十几分钟后我们来看成果,可以看到数据库已经部署成功了。
docker exec -it kwdb-server bash
连接到 kwdb 的 SQL 客户端查看下数据库版本,及数据的基础功能是否正常,若终端显示 KaiwuDB 版本号(如 3.1.0),且容器状态为「Up」,则说明部署成功。
docker exec -it kwdb-server /kaiwudb/bin/kwbase sql --insecure
部署后可通过以下命令管理 KWDB 容器,方便后续运维:
docker ps --filter "name=kwdb-server"
docker logs -f kwdb-server
docker exec -it kwdb-server bash
docker exec -it kwdb-server /kaiwudb/bin/kwbase sql --insecure
docker-compose down
docker-compose up -d
三、智能家居本地化数据处理系统开发
3.1 发送开发指令,启动自动化构建 基于已部署的 KaiwuDB,通过 CodeArts 智能体指令驱动,自动完成智能家居本地化数据处理系统的全模块开发,包括设备接入、数据处理、规则引擎、本地存储、Web API 等核心模块。
先构造好提示词如下,在 CodeArts 智能体模式下,输入以下 Prompt 指令,引导智能体完成应用开发:
使用上面部署的 kwdb 多模时序数据库请为我设计一套「智能家居本地化数据处理系统」,核心目标是:
1. 所有设备数据(如传感器数据、设备控制指令、用户行为数据)均在本地处理,不上传至云端,保障用户隐私;
2. 系统具备低延迟、高稳定性,支持断网情况下智能家居设备正常联动;
3. 适配常见智能家居设备(如温湿度传感器、人体红外传感器、智能灯、智能窗帘、智能空调、智能门锁);
4. 支持简单的自动化规则自定义(如'当检测到有人且光线低于阈值时自动开灯')。
5. 核心功能模块设计:
- 设备接入模块:如何适配不同协议的设备,实现数据采集;
- 本地化数据处理模块:实时数据解析、清洗、转换的逻辑;
- 规则引擎模块:用户自定义自动化规则的存储与执行逻辑;
- 本地存储模块:数据存储策略(如哪些数据需要时序表、哪些数据持久化关系表、存储周期);
此时可以看到,智能体开始自动构建任务,生成开发方案、编写代码。
3.2 系统整体架构 系统采用分层架构设计,所有模块均基于本地运行,数据全量存储在 KaiwuDB,实现零云端依赖。
┌─────────────────────────────────────────────────────────────┐
│ 智能家居本地化数据处理系统 │
├─────────────────────────────────────────────────────────────┤
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ 设备接入层 │ │ 数据处理层 │ │ 规则引擎层 │ │
│ │ - MQTT 协议 │ │ - 数据解析 │ │ - 规则存储 │ │
│ │ - HTTP 协议 │ │ - 数据清洗 │ │ - 规则匹配 │ │
│ │ - CoAP 协议 │ │ - 数据转换 │ │ - 规则执行 │ │
│ │ - Zigbee 网关 │ │ - 数据验证 │ │ - 动作触发 │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │ │ │ │
│ └──────────────────┼──────────────────┘ │
│ │ │
│ ┌────────▼────────┐ │
│ │ KaiwuDB 数据库 │ │
│ │ - 时序数据表 │ │
│ │ - 设备信息表 │ │
│ │ - 规则配置表 │ │
│ │ - 事件日志表 │ │
│ └─────────────────┘ │
└─────────────────────────────────────────────────────────────┘
3.3 项目结构 CodeArts 智能体将生成规范的项目目录,所有模块解耦设计,便于后续扩展与维护,核心项目结构如下:
smarthome-system/
├── README.md
├── docker-compose.yml
├── database/
│ ├── schema .sql
│ └── sample_data.sql
├── device-adapter/
│ ├── mqtt_adapter.py
│ ├── http_adapter.py
│ ├── coap_adapter.py
│ └── zigbee_adapter.py
├── data-processor/
│ ├── parser.py
│ ├── cleaner.py
│ ├── transformer.py
│ └── validator.py
├── rule-engine/
│ ├── rule_manager.py
│ ├── rule_executor.py
│ ├── condition_matcher.py
│ └── action_trigger.py
├── storage/
│ ├── timeseries_store.py
│ ├── relational_store.py
│ └── data_retention.py
├── web-api/
│ ├── app.py
│ ├── device_api.py
│ ├── rule_api.py
│ └── data_api.py
└── scripts/
├── init_system.py
└── test_system.py
3.4 KaiwuDB 核心库表设计
CREATE TS DATABASE IF NOT EXISTS tsdb;
USE tsdb;
CREATE TABLE IF NOT EXISTS home_sensor (
ts TIMESTAMP NOT NULL ,
value FLOAT ,
status VARCHAR (32 ),
file_path VARCHAR (128 )
) ATTRIBUTES (
device_id VARCHAR (64 ) NOT NULL ,
data_type VARCHAR (32 ) NOT NULL ,
location VARCHAR (32 ),
device_model VARCHAR (64 )
) PRIMARY TAGS(device_id)
RETENTIONS 20 D ACTIVETIME 12 h;
CREATE TABLE IF NOT EXISTS device_commands (
ts TIMESTAMP NOT NULL ,
command_type VARCHAR (64 ),
parameters varchar (64 ),
status VARCHAR (32 ),
executed_at TIMESTAMP ,
error_message VARCHAR (256 )
) ATTRIBUTES (
device_id VARCHAR (64 ) NOT NULL ,
command_id VARCHAR (128 ),
user_id VARCHAR (64 ),
source VARCHAR (32 )
) PRIMARY TAGS(device_id)
RETENTIONS 30 D ACTIVETIME 1 h;
CREATE TABLE IF NOT EXISTS rule_execution (
ts TIMESTAMP NOT NULL ,
trigger_event VARCHAR (32 ),
conditions_matched VARCHAR (32 ),
actions_executed VARCHAR (32 ),
status VARCHAR (32 ),
execution_time_ms INT ,
error_message VARCHAR (256 )
) ATTRIBUTES (
rule_id INT NOT NULL ,
rule_name VARCHAR (128 ) NOT NULL ,
priority INT ,
triggered_by VARCHAR (64 )
) PRIMARY TAGS(rule_id)
RETENTIONS 60 D ACTIVETIME 6 h;
CREATE TABLE IF NOT EXISTS system_events (
ts TIMESTAMP NOT NULL ,
event_type VARCHAR (64 ),
event_level VARCHAR (32 ),
message VARCHAR (512 ),
details VARCHAR (64 )
) ATTRIBUTES (
source VARCHAR (64 ) NOT NULL ,
device_id VARCHAR (64 )
) PRIMARY TAGS(source)
RETENTIONS 30 D ACTIVETIME 1 h;
CREATE TABLE IF NOT EXISTS user_activity (
ts TIMESTAMP NOT NULL ,
action_type VARCHAR (64 ),
device_id VARCHAR (64 ),
action_details VARCHAR (64 )
) ATTRIBUTES (
user_id VARCHAR (64 ) NOT NULL ,
session_id VARCHAR (128 ),
ip_address VARCHAR (64 ),
user_agent VARCHAR (256 )
) PRIMARY TAGS(user_id)
RETENTIONS 90 D ACTIVETIME 24 h;
USE defaultdb;
CREATE TABLE IF NOT EXISTS devices (
id INT PRIMARY KEY ,
name STRING NOT NULL ,
device_id STRING NOT NULL UNIQUE ,
device_type STRING NOT NULL ,
protocol STRING NOT NULL ,
location STRING,
status STRING DEFAULT 'online' ,
config JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_devices_device_id ON devices(device_id);
CREATE INDEX IF NOT EXISTS idx_devices_type ON devices(device_type);
CREATE INDEX IF NOT EXISTS idx_devices_location ON devices(location);
CREATE INDEX IF NOT EXISTS idx_devices_status ON devices(status);
CREATE TABLE IF NOT EXISTS automation_rules (
id INT PRIMARY KEY ,
name STRING NOT NULL ,
description STRING,
enabled BOOL DEFAULT TRUE ,
priority INT DEFAULT 0 ,
conditions JSONB NOT NULL ,
actions JSONB NOT NULL ,
created_by STRING DEFAULT 'system' ,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ,
last_triggered_at TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_rules_enabled ON automation_rules(enabled);
CREATE INDEX IF NOT EXISTS idx_rules_priority ON automation_rules(priority DESC );
CREATE TABLE IF NOT EXISTS data_statistics (
id INT PRIMARY KEY ,
device_id STRING NOT NULL ,
data_type STRING NOT NULL ,
stat_type STRING NOT NULL ,
time_window STRING NOT NULL ,
stat_value DECIMAL (20 ,6 ),
stat_time TIMESTAMP NOT NULL ,
calculated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_data_stats_device ON data_statistics(device_id, data_type, stat_type, time_window, stat_time);
CREATE TABLE IF NOT EXISTS device_snapshots (
id INT PRIMARY KEY ,
device_id STRING NOT NULL ,
snapshot_time TIMESTAMP NOT NULL ,
current_state JSONB,
battery_level INT ,
signal_strength INT ,
last_seen TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_device_snapshots_device ON device_snapshots(device_id, snapshot_time DESC );
CREATE SEQUENCE IF NOT EXISTS devices_seq START 1 ;
CREATE SEQUENCE IF NOT EXISTS automation_rules_seq START 1 ;
CREATE SEQUENCE IF NOT EXISTS data_statistics_seq START 1 ;
CREATE SEQUENCE IF NOT EXISTS device_snapshots_seq START 1 ;
CREATE VIEW IF NOT EXISTS v_device_latest_status AS
SELECT d.id, d.name, d.device_id, d.device_type, d.location, d.status, hs.ts as last_update, hs.value, hs.data_type, hs.status as sensor_status
FROM devices d
LEFT JOIN (
SELECT device_id, data_type, ts, value , status, ROW_NUMBER () OVER (PARTITION BY device_id ORDER BY ts DESC ) as rn
FROM tsdb.home_sensor
) hs ON d.device_id = hs.device_id AND hs.rn = 1 ;
CREATE VIEW IF NOT EXISTS v_active_rules AS
SELECT id, name, description, priority, conditions, actions, last_triggered_at
FROM automation_rules
WHERE enabled = TRUE
ORDER BY priority DESC ;
POST /api/data/receive - 接收设备数据
POST /api/device/<id >/control - 控制设备
GET /api/devices - 获取设备列表
GET /api/sensor-data/latest - 获取最新传感器数据
GET /api/rules - 获取自动化规则
GET /api/statistics - 获取统计数据
curl -X POST http://localhost:5001/api/data/receive \
-H "Content-Type: application/json" \
-d '{"device_id": "sensor_temp_livingroom", "value": 26.5, "data_type": "temperature", "unit": "°C"}'
curl -X POST http://localhost:5001/api/device/5/control \
-H "Content-Type: application/json" \
-d '{"command": "turn_on", "parameters": {"brightness": 80}}'
INSERT INTO devices (id, name, device_id, device_type, protocol, location, status, config) VALUES
(1 , '客厅温度传感器' , 'sensor_temp_livingroom' , 'temperature' , 'mqtt' , '客厅' , 'online' , '{"topic": "sensors/livingroom/temp", "interval": 60}' ),
(2 , '客厅湿度传感器' , 'sensor_humidity_livingroom' , 'humidity' , 'mqtt' , '客厅' , 'online' , '{"topic": "sensors/livingroom/humidity", "interval": 60}' ),
(3 , '客厅人体传感器' , 'sensor_motion_livingroom' , 'motion' , 'mqtt' , '客厅' , 'online' , '{"topic": "sensors/livingroom/motion", "interval": 5}' ),
(4 , '客厅光照传感器' , 'sensor_light_livingroom' , 'light' , 'mqtt' , '客厅' , 'online' , '{"topic": "sensors/livingroom/light", "interval": 30}' ),
(5 , '客厅智能灯' , 'device_light_livingroom' , 'smart_light' , 'mqtt' , '客厅' , 'online' , '{"topic": "devices/livingroom/light", "supported_commands": ["turn_on", "turn_off", "set_brightness", "set_color"]}' ),
(6 , '卧室温度传感器' , 'sensor_temp_bedroom' , 'temperature' , 'mqtt' , '卧室' , 'online' , '{"topic": "sensors/bedroom/temp", "interval": 60}' ),
(7 , '卧室智能灯' , 'device_light_bedroom' , 'smart_light' , 'mqtt' , '卧室' , 'online' , '{"topic": "devices/bedroom/light", "supported_commands": ["turn_on", "turn_off", "set_brightness"]}' ),
(8 , '智能门锁' , 'device_lock_entrance' , 'smart_lock' , 'mqtt' , '玄关' , 'online' , '{"topic": "devices/entrance/lock", "supported_commands": ["lock", "unlock", "get_status"]}' ),
(9 , '智能窗帘' , 'device_curtain_livingroom' , 'smart_curtain' , 'mqtt' , '客厅' , 'online' , '{"topic": "devices/livingroom/curtain", "supported_commands": ["open", "close", "set_position"]}' ),
(10 , '智能空调' , 'device_ac_livingroom' , 'smart_ac' , 'mqtt' , '客厅' , 'online' , '{"topic": "devices/livingroom/ac", "supported_commands": ["turn_on", "turn_off", "set_temperature", "set_mode"]}' );
INSERT INTO automation_rules (id, name, description, enabled, priority, conditions, actions) VALUES
(1 , '客厅有人自动开灯' , '检测到客厅有人且光线低于阈值时自动开灯' , TRUE , 10 , '{"conditions": [{"device_id": "sensor_motion_livingroom", "type": "motion", "operator": "=", "value": true}, {"device_id": "sensor_light_livingroom", "type": "light", "operator": "<", "value": 100}], "logic": "AND"}' , '{"actions": [{"device_id": "device_light_livingroom", "command": "turn_on", "parameters": {"brightness": 80}}]}' ),
(2 , '客厅无人自动关灯' , '客厅无人超过 10 分钟自动关灯' , TRUE , 5 , '{"conditions": [{"device_id": "sensor_motion_livingroom", "type": "motion", "operator": "=", "value": false, "duration": 600}], "logic": "AND"}' , '{"actions": [{"device_id": "device_light_livingroom", "command": "turn_off"}]}' ),
(3 , '温度过高自动开空调' , '客厅温度超过 28 度自动开启空调' , TRUE , 8 , '{"conditions": [{"device_id": "sensor_temp_livingroom", "type": "temperature", "operator": ">", "value": 28}], "logic": "AND"}' , '{"actions": [{"device_id": "device_ac_livingroom", "command": "turn_on", "parameters": {"temperature": 24, "mode": "cool"}}]}' ),
(4 , '离家模式' , '离家时关闭所有灯光和空调' , TRUE , 15 , '{"conditions": [{"device_id": "device_lock_entrance", "type": "door_lock", "operator": "=", "value": "locked"}], "logic": "AND"}' , '{"actions": [{"device_id": "device_light_livingroom", "command": "turn_off"}, {"device_id": "device_light_bedroom", "command": "turn_off"}, {"device_id": "device_ac_livingroom", "command": "turn_off"}]}' );
"""
智能家居 Web 应用主程序
提供数据展示和控制界面
"""
import sys
import os
import json
import logging
from datetime import datetime, timedelta
from flask import Flask, render_template, jsonify, request
sys.path.insert(0 , os.path.join(os.path.dirname(__file__), '../data-processor' ))
sys.path.insert(0 , os.path.join(os.path.dirname(__file__), '../rule-engine' ))
sys.path.insert(0 , os.path.join(os.path.dirname(__file__), '../storage' ))
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = Flask(__name__, template_folder='templates' , static_folder='static' )
class MockDB :
"""模拟数据库连接"""
def execute_query (self, query, params=None ):
"""执行查询"""
import subprocess
try :
cmd = ['docker' , 'exec' , 'kwdb-server' , '/kaiwudb/bin/kwbase' , 'sql' , '--insecure' , '--execute' , query]
result = subprocess.run(cmd, capture_output=True , text=True )
if result.returncode == 0 :
lines = result.stdout.strip().split('\n' )
if len (lines) > 1 :
headers = lines[0 ].split('\t' )
data = []
for line in lines[1 :]:
values = line.split('\t' )
if len (values) == len (headers):
data.append(values)
return data
return []
except Exception as e:
logger.error(f"查询失败:{e} " )
return []
db = MockDB()
@app.route('/' )
def index ():
"""首页"""
return render_template('index.html' )
@app.route('/api/devices' )
def get_devices ():
"""获取设备列表"""
query = "SELECT id, name, device_id, device_type, protocol, location, status FROM devices ORDER BY id"
results = db.execute_query(query)
devices = []
if results and len (results) > 1 :
headers = results[0 ]
for row in results[1 :]:
device = {
'id' : row[0 ],
'name' : row[1 ],
'device_id' : row[2 ],
'device_type' : row[3 ],
'protocol' : row[4 ],
'location' : row[5 ],
'status' : row[6 ]
}
devices.append(device)
return jsonify(devices)
@app.route('/api/devices/<int:device_id>/data' )
def get_device_data (device_id ):
"""获取设备数据"""
hours = request.args.get('hours' , 24 , type =int )
start_time = (datetime.now() - timedelta(hours=hours)).isoformat()
device_query = f"SELECT device_id FROM devices WHERE id = {device_id} "
device_result = db.execute_query(device_query)
if not device_result or len (device_result) < 2 :
return jsonify({'error' : '设备不存在' }), 404
device_id_str = device_result[1 ][0 ]
data_query = f""" SELECT ts, value, data_type, status FROM sensor_data WHERE device_id = '{device_id_str} ' AND ts >= '{start_time} ' ORDER BY ts DESC LIMIT 100 """
results = db.execute_query(data_query)
data_points = []
if results and len (results) > 1 :
for row in results[1 :]:
data_points.append({
'timestamp' : row[0 ],
'value' : float (row[1 ]) if row[1 ] else None ,
'data_type' : row[2 ],
'status' : row[3 ]
})
return jsonify({
'device_id' : device_id,
'device_id_str' : device_id_str,
'data' : data_points
})
@app.route('/api/sensor-data/latest' )
def get_latest_sensor_data ():
"""获取最新传感器数据"""
query = """ SELECT d.id, d.name, d.device_type, d.location, d.status, sd.ts as last_update, sd.value, sd.data_type, sd.unit FROM devices d LEFT JOIN ( SELECT device_id, data_type, ts, value, unit, ROW_NUMBER() OVER (PARTITION BY device_id ORDER BY ts DESC) as rn FROM sensor_data ) sd ON d.device_id = sd.device_id AND sd.rn = 1 ORDER BY d.id """
results = db.execute_query(query)
sensor_data = []
if results and len (results) > 1 :
for row in results[1 :]:
sensor_data.append({
'id' : row[0 ],
'name' : row[1 ],
'device_type' : row[2 ],
'location' : row[3 ],
'status' : row[4 ],
'last_update' : row[5 ],
'value' : float (row[6 ]) if row[6 ] else None ,
'data_type' : row[7 ],
'unit' : row[8 ] if len (row) > 8 else None
})
return jsonify(sensor_data)
@app.route('/api/rules' )
def get_rules ():
"""获取自动化规则"""
query = "SELECT id, name, description, enabled, priority, last_triggered_at FROM automation_rules ORDER BY priority DESC"
results = db.execute_query(query)
rules = []
if results and len (results) > 1 :
for row in results[1 :]:
rules.append({
'id' : row[0 ],
'name' : row[1 ],
'description' : row[2 ],
'enabled' : row[3 ] == 't' ,
'priority' : int (row[4 ]),
'last_triggered_at' : row[5 ]
})
return jsonify(rules)
@app.route('/api/statistics' )
def get_statistics ():
"""获取统计数据"""
stats = {}
device_query = "SELECT device_type, COUNT(*) as count, COUNT(CASE WHEN status = 'online' THEN 1 END) as online_count FROM devices GROUP BY device_type"
device_results = db.execute_query(device_query)
stats['devices' ] = {}
if device_results and len (device_results) > 1 :
for row in device_results[1 :]:
stats['devices' ][row[0 ]] = {
'total' : int (row[1 ]),
'online' : int (row[2 ])
}
data_query = "SELECT COUNT(*) as total_records FROM sensor_data"
data_results = db.execute_query(data_query)
if data_results and len (data_results) > 1 :
stats['sensor_data' ] = {
'total_records' : int (data_results[1 ][0 ])
}
rule_query = "SELECT COUNT(*) as total, COUNT(CASE WHEN enabled = TRUE THEN 1 END) as enabled FROM automation_rules"
rule_results = db.execute_query(rule_query)
if rule_results and len (rule_results) > 1 :
stats['rules' ] = {
'total' : int (rule_results[1 ][0 ]),
'enabled' : int (rule_results[1 ][1 ])
}
return jsonify(stats)
@app.route('/api/data/receive' , methods=['POST' ] )
def receive_data ():
"""接收设备数据"""
try :
data = request.json
if not data or 'device_id' not in data or 'value' not in data:
return jsonify({'error' : '缺少必要字段' }), 400
device_id = data['device_id' ]
value = data['value' ]
data_type = data.get('data_type' , 'value' )
status = data.get('status' , 'good' )
unit = data.get('unit' , '' )
query = f""" INSERT INTO sensor_data (time, device_id, data_type, value, status, unit) VALUES (NOW(), '{device_id} ', '{data_type} ', {value} , '{status} ', '{unit} ') """
db.execute_query(query)
return jsonify({'success' : True , 'message' : '数据接收成功' })
except Exception as e:
logger.error(f"接收数据失败:{e} " )
return jsonify({'error' : str (e)}), 500
@app.route('/api/device/<int:device_id>/control' , methods=['POST' ] )
def control_device (device_id ):
"""控制设备"""
try :
data = request.json
if not data or 'command' not in data:
return jsonify({'error' : '缺少命令参数' }), 400
command = data['command' ]
parameters = data.get('parameters' , {})
device_query = f"SELECT device_id, name FROM devices WHERE id = {device_id} "
device_result = db.execute_query(device_query)
if not device_result or len (device_result) < 2 :
return jsonify({'error' : '设备不存在' }), 404
device_id_str = device_result[1 ][0 ]
device_name = device_result[1 ][1 ]
cmd_query = f""" INSERT INTO device_commands (time, device_id, command_type, parameters, status) VALUES (NOW(), '{device_id_str} ', '{command} ', '{json.dumps(parameters)} ', 'success') """
db.execute_query(cmd_query)
logger.info(f"控制设备:{device_name} ({device_id_str} ), 命令:{command} " )
return jsonify({
'success' : True ,
'message' : f'设备 {device_name} 控制指令已发送' ,
'command' : command,
'parameters' : parameters
})
except Exception as e:
logger.error(f"控制设备失败:{e} " )
return jsonify({'error' : str (e)}), 500
if __name__ == '__main__' :
app.run(host='0.0.0.0' , port=5001 , debug=True )
四、系统运行与功能测试 启动系统 Web API 服务,并通过接口测试 与页面验证 ,确认系统所有功能正常运行。
1. 启动系统 Web API 服务 cd /root/workspace/smarthome-system/web-api && python3 simple_app.py > /tmp/smarthome-web-5002.log 2>&1 &
root@kwdb:~/workspace/smarthome-system/web-api# python3 simple_app.py
启动智能家居 Web 服务...
* Serving Flask app 'simple_app'
* Debug mode: off
WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
* Running on all addresses (0.0.0.0)
* Running on http://127.0.0.1:5002
* Running on http://192.168.150.111:5002
Press CTRL+C to quit
127.0.0.1 - - [27/Feb/2026 13:06:51] "GET / HTTP/1.1" 200 -
127.0.0.1 - - [27/Feb/2026 13:06:51] "GET /static/css/style.css HTTP/1.1" 200 -
127.0.0.1 - - [27/Feb/2026 13:06:51] "GET /static/js/app.js HTTP/1.1" 200 -
执行查询:SELECT id , name, device_type, protocol, location, status FROM devices ORDER BY id
127.0.0.1 - - [27/Feb/2026 13:06:51] "GET /favicon.ico HTTP/1.1" 404 -
查询结果:[['1' , '客厅温度传感器' , 'temperature' , 'mqtt' , '客厅' , 'online' ], ['2' , '客厅湿度传感器' , 'humidity' , 'mqtt' , '客厅' , 'online' ], ['3' , '客厅人体传感器' , 'motion' , 'mqtt' , '客厅' , 'online' ], ['4' , '客厅光照传感器' , 'light' , 'mqtt' , '客厅' , 'online' ], ['5' , '客厅智能灯' , 'smart_light' , 'mqtt' , '客厅' , 'online' ], ['6' , '卧室温度传感器' , 'temperature' , 'mqtt' , '卧室' , 'online' ], ['7' , '卧室智能灯' , 'smart_light' , 'mqtt' , '卧室' , 'online' ], ['8' , '智能门锁' , 'smart_lock' , 'mqtt' , '玄关' , 'online' ], ['9' , '智能窗帘' , 'smart_curtain' , 'mqtt' , '客厅' , 'online' ], ['10' , '智能空调' , 'smart_ac' , 'mqtt' , '客厅' , 'online' ]]
启动成功后,终端将显示:Running on http://[你的服务器 IP]:5002,即服务启动完成,点击在浏览器中打开可以看到程序运行的界面。
系统概览:设备总数、在线设备数、自动化规则数、数据记录数及设备列表。
实时数据:温湿度、光照、人体感应等传感器的最新数据与更新时间;自动化规则:所有规则的名称、描述、启用状态、优先级。
系统状态:KaiwuDB 版本、服务运行状态、接口文档,这里为了方便把数据接口放在了最下面,便于查找调用。
2. 测试接口 curl -X POST http://localhost:5002/api/data/receive -H "Content-Type: application/json" -d '{"device_id": "temperature", "value": 32, "data_type": "temperature", "unit": "°C"}'
将传感器实时数据上报至本地系统,数据自动存储至 KaiwuDB 时序表,这里可以看到客厅问题传感器已经标称 32 度。
curl -s http://localhost:5002/api/devices
五、总结 本文以 KaiwuDB 为数据基座,华为 CodeArts 代码智能体为自动化开发引擎,实现了智能家居本地化数据处理系统的快速构建,全程无需复杂的手动编码,新手也可按步骤复现。随着 AIoT 技术的发展,本地化数据处理将成为智能家居的主流趋势,KaiwuDB 多模数据库让开发者实现'轻量、高效、安全'的智能场景落地。
相关免费在线工具 RSA密钥对生成器 生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
Mermaid 预览与可视化编辑 基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
随机西班牙地址生成器 随机生成西班牙地址(支持马德里、加泰罗尼亚、安达卢西亚、瓦伦西亚筛选),支持数量快捷选择、显示全部与下载。 在线工具,随机西班牙地址生成器在线工具,online
curl 转代码 解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
Base64 字符串编码/解码 将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
Base64 文件转换器 将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online