一、环境准备
1.1、配置 VSCode 远程链接服务器
所有操作在 VSCode 下完成。
1.1.1、首先在 VSCode 下载安装 Remote-SSH 扩展,打开 VSCode,在扩展里搜索 remote-ssh,然后点击安装。
1.1.2、链接远程服务器 点击 remote-ssh 扩展,点击加号,然后按着提示输入:ssh root@[服务器 IP],这里服务器替换成自己的 IP 与用户,然后回车确认。 选择 SSH 信息存放的配置文件。 至此配置完成,点击登录。 因为我们是 Linux 的服务器,这里选择 Linux。 输入密码然后回车完成。 至此链接远程服务器成功。
1.2、准备 CodeArts 代码智能体
CodeArts 代码智能体是基于智能生成、智能问答两大核心能力构建起一套全方位、多层次的智能开发体系。在智能生成方面,它能够依据开发者输入的需求描述,准确且高效地生成高质量代码。
1.2.1、安装 CodeArts 扩展 在 VSCode 下载安装 CodeArts 代码智能体,在扩展里搜索 CodeArts,然后点击安装。
1.2.2、CodeArts 登录 安装完成后点击 CodeArts 图标,然后点击登录。 然后登录华为云账号进行登录。 登录成功后如下。 在 CodeArts 代码智能体对话框下侧点 Agent 模式,在弹出的菜单最后侧点设置按钮,进入授权所有自动化操作界面,分别勾选读取文件和目录、编辑文件、执行命令、更新代办、执行 task 工具、使用浏览器。 至此 CodeArts 代码智能体已经准备完成了,下面全部交给 AI 帮我们部署数据库,开发部署应用。
二、数据库自动部署
借助 CodeArts 代码智能体的自动化能力,通过自然语言指令即可完成 KaiwuDB 的 Docker 化部署,全程无需手动编写部署脚本。
在 CodeArts 代码智能体里输入提示词:帮我使用 docker 部署一个最新版的 kwdb,然后等待数据库部署。 等待十几分钟后我们来看成果,可以看到数据库已经部署成功了。
进入容器查看 kwdb 的服务:
docker exec -it kwdb-server bash
连接到 kwdb 的 SQL 客户端查看下数据库版本,及数据的基础功能是否正常,若终端显示 KaiwuDB 版本号(如 3.1.0),且容器状态为「Up」,则说明部署成功。
docker exec -it kwdb-server /kaiwudb/bin/kwbase sql --insecure
部署后可通过以下命令管理 KWDB 容器,方便后续运维:
# 查看容器状态
docker ps --filter "name=kwdb-server"
# 查看日志
docker logs -f kwdb-server
# 进入容器
docker exec -it kwdb-server bash
# 连接到 SQL 客户端
docker exec -it kwdb-server /kaiwudb/bin/kwbase sql --insecure
# 停止服务
docker-compose down
# 启动服务
docker-compose up -d
三、智能家居本地化数据处理系统开发
3.1 发送开发指令,启动自动化构建
基于已部署的 KaiwuDB,通过 CodeArts 智能体指令驱动,自动完成智能家居本地化数据处理系统的全模块开发,包括设备接入、数据处理、规则引擎、本地存储、Web API 等核心模块。
先构造好提示词如下,在 CodeArts 智能体模式下,输入以下 Prompt 指令,引导智能体完成应用开发:
使用上面部署的 kwdb 多模时序数据库请为我设计一套「智能家居本地化数据处理系统」,核心目标是:
1. 所有设备数据(如传感器数据、设备控制指令、用户行为数据)均在本地处理,不上传至云端,保障用户隐私;
2. 系统具备低延迟、高稳定性,支持断网情况下智能家居设备正常联动;
3. 适配常见智能家居设备(如温湿度传感器、人体红外传感器、智能灯、智能窗帘、智能空调、智能门锁);
4. 支持简单的自动化规则自定义(如'当检测到有人且光线低于阈值时自动开灯')。
5. 核心功能模块设计:
- 设备接入模块:如何适配不同协议的设备,实现数据采集;
- 本地化数据处理模块:实时数据解析、清洗、转换的逻辑;
- 规则引擎模块:用户自定义自动化规则的存储与执行逻辑;
- 本地存储模块:数据存储策略(如哪些数据需要时序表、哪些数据持久化关系表、存储周期);
此时可以看到,智能体开始自动构建任务,生成开发方案、编写代码。
3.2、系统整体架构
系统采用分层架构设计,所有模块均基于本地运行,数据全量存储在 KaiwuDB,实现零云端依赖。
┌─────────────────────────────────────────────────────────────┐
│ 智能家居本地化数据处理系统 │
├─────────────────────────────────────────────────────────────┤
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ 设备接入层 │ │ 数据处理层 │ │ 规则引擎层 │ │
│ │ - MQTT 协议 │ │ - 数据解析 │ │ - 规则存储 │ │
│ │ - HTTP 协议 │ │ - 数据清洗 │ │ - 规则匹配 │ │
│ │ - CoAP 协议 │ │ - 数据转换 │ │ - 规则执行 │ │
│ │ - Zigbee 网关 │ │ - 数据验证 │ │ - 动作触发 │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │ │ │ │
│ └──────────────────┼──────────────────┘ │
│ │ │
│ ┌────────▼────────┐ │
│ │ KaiwuDB 数据库 │ │
│ │ - 时序数据表 │ │
│ │ - 设备信息表 │ │
│ │ - 规则配置表 │ │
│ │ - 事件日志表 │ │
│ └─────────────────┘ │
└─────────────────────────────────────────────────────────────┘
3.3、项目结构
CodeArts 智能体将生成规范的项目目录,所有模块解耦设计,便于后续扩展与维护,核心项目结构如下:
smarthome-system/
├── README.md
├── docker-compose.yml # 系统部署配置
├── database/
│ ├── schema.sql # 数据库初始化脚本
│ └── sample_data.sql # 示例数据
├── device-adapter/
│ ├── mqtt_adapter.py # MQTT 协议适配器
│ ├── http_adapter.py # HTTP 协议适配器
│ ├── coap_adapter.py # CoAP 协议适配器
│ └── zigbee_adapter.py # Zigbee 网关适配器
├── data-processor/
│ ├── parser.py # 数据解析器
│ ├── cleaner.py # 数据清洗器
│ ├── transformer.py # 数据转换器
│ └── validator.py # 数据验证器
├── rule-engine/
│ ├── rule_manager.py # 规则管理器
│ ├── rule_executor.py # 规则执行器
│ ├── condition_matcher.py # 条件匹配器
│ └── action_trigger.py # 动作触发器
├── storage/
│ ├── timeseries_store.py # 时序数据存储
│ ├── relational_store.py # 关系数据存储
│ └── data_retention.py # 数据保留策略
├── web-api/
│ ├── app.py # Flask 应用
│ ├── device_api.py # 设备管理 API
│ ├── rule_api.py # 规则管理 API
│ └── data_api.py # 数据查询 API
└── scripts/
├── init_system.py # 系统初始化脚本
└── test_system.py # 系统测试脚本
3.4、KaiwuDB 核心库表设计
智能家居本地化数据处理系统时序表设计:
-- 创建时序数据库
CREATE TS DATABASE IF NOT EXISTS tsdb;
USE tsdb;
-- ============================================
-- 时序表设计
-- ============================================
-- 1. 传感器时序数据表
CREATE TABLE IF NOT EXISTS home_sensor (
ts TIMESTAMP NOT NULL,
value FLOAT,
status VARCHAR(32),
file_path VARCHAR(128)
) ATTRIBUTES (
device_id VARCHAR(64) NOT NULL,
data_type VARCHAR(32) NOT NULL,
location VARCHAR(32),
device_model VARCHAR(64)
) PRIMARY TAGS(device_id)
RETENTIONS 20D ACTIVETIME 12h;
-- 2. 设备控制指令时序表
CREATE TABLE IF NOT EXISTS device_commands (
ts TIMESTAMP NOT NULL,
command_type VARCHAR(64),
parameters varchar(64),
status VARCHAR(32),
executed_at TIMESTAMP,
error_message VARCHAR(256)
) ATTRIBUTES (
device_id VARCHAR(64) NOT NULL,
command_id VARCHAR(128),
user_id VARCHAR(64),
source VARCHAR(32)
) PRIMARY TAGS(device_id)
RETENTIONS 30D ACTIVETIME 1h;
-- 3. 规则执行历史时序表
CREATE TABLE IF NOT EXISTS rule_execution (
ts TIMESTAMP NOT NULL,
trigger_event VARCHAR(32),
conditions_matched VARCHAR(32),
actions_executed VARCHAR(32),
status VARCHAR(32),
execution_time_ms INT,
error_message VARCHAR(256)
) ATTRIBUTES (
rule_id INT NOT NULL,
rule_name VARCHAR(128) NOT NULL,
priority INT,
triggered_by VARCHAR(64)
) PRIMARY TAGS(rule_id)
RETENTIONS 60D ACTIVETIME 6h;
-- 4. 系统事件时序表
CREATE TABLE IF NOT EXISTS system_events (
ts TIMESTAMP NOT NULL,
event_type VARCHAR(64),
event_level VARCHAR(32),
message VARCHAR(512),
details VARCHAR(64)
) ATTRIBUTES (
source VARCHAR(64) NOT NULL,
device_id VARCHAR(64)
) PRIMARY TAGS(source)
RETENTIONS 30D ACTIVETIME 1h;
-- 5. 用户行为时序表
CREATE TABLE IF NOT EXISTS user_activity (
ts TIMESTAMP NOT NULL,
action_type VARCHAR(64),
device_id VARCHAR(64),
action_details VARCHAR(64)
) ATTRIBUTES (
user_id VARCHAR(64) NOT NULL,
session_id VARCHAR(128),
ip_address VARCHAR(64),
user_agent VARCHAR(256)
) PRIMARY TAGS(user_id)
RETENTIONS 90D ACTIVETIME 24h;
智能家居本地化数据处理系统关系表设计:
USE defaultdb;
-- 1. 设备管理表(关系型表)
CREATE TABLE IF NOT EXISTS devices (
id INT PRIMARY KEY,
name STRING NOT NULL,
device_id STRING NOT NULL UNIQUE, -- 设备唯一标识
device_type STRING NOT NULL,
protocol STRING NOT NULL,
location STRING,
status STRING DEFAULT 'online',
config JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_devices_device_id ON devices(device_id);
CREATE INDEX IF NOT EXISTS idx_devices_type ON devices(device_type);
CREATE INDEX IF NOT EXISTS idx_devices_location ON devices(location);
CREATE INDEX IF NOT EXISTS idx_devices_status ON devices(status);
-- 2. 自动化规则表(关系型表)
CREATE TABLE IF NOT EXISTS automation_rules (
id INT PRIMARY KEY,
name STRING NOT NULL,
description STRING,
enabled BOOL DEFAULT TRUE,
priority INT DEFAULT 0,
conditions JSONB NOT NULL,
actions JSONB NOT NULL,
created_by STRING DEFAULT 'system',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_triggered_at TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_rules_enabled ON automation_rules(enabled);
CREATE INDEX IF NOT EXISTS idx_rules_priority ON automation_rules(priority DESC);
-- 3. 数据聚合统计表(关系型表)
CREATE TABLE IF NOT EXISTS data_statistics (
id INT PRIMARY KEY,
device_id STRING NOT NULL,
data_type STRING NOT NULL,
stat_type STRING NOT NULL,
time_window STRING NOT NULL,
stat_value DECIMAL(20,6),
stat_time TIMESTAMP NOT NULL,
calculated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_data_stats_device ON data_statistics(device_id, data_type, stat_type, time_window, stat_time);
-- 4. 设备状态快照表(关系型表)
CREATE TABLE IF NOT EXISTS device_snapshots (
id INT PRIMARY KEY,
device_id STRING NOT NULL,
snapshot_time TIMESTAMP NOT NULL,
current_state JSONB,
battery_level INT,
signal_strength INT,
last_seen TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_device_snapshots_device ON device_snapshots(device_id, snapshot_time DESC);
-- ============================================
-- 创建序列
-- ============================================
CREATE SEQUENCE IF NOT EXISTS devices_seq START 1;
CREATE SEQUENCE IF NOT EXISTS automation_rules_seq START 1;
CREATE SEQUENCE IF NOT EXISTS data_statistics_seq START 1;
CREATE SEQUENCE IF NOT EXISTS device_snapshots_seq START 1;
智能家居本地化数据处理系统视图设计:
-- 创建视图
-- ============================================
CREATE VIEW IF NOT EXISTS v_device_latest_status AS
SELECT d.id, d.name, d.device_id, d.device_type, d.location, d.status, hs.ts as last_update, hs.value, hs.data_type, hs.status as sensor_status
FROM devices d
LEFT JOIN (
SELECT device_id, data_type, ts, value, status, ROW_NUMBER() OVER (PARTITION BY device_id ORDER BY ts DESC) as rn
FROM tsdb.home_sensor
) hs ON d.device_id = hs.device_id AND hs.rn = 1;
CREATE VIEW IF NOT EXISTS v_active_rules AS
SELECT id, name, description, priority, conditions, actions, last_triggered_at
FROM automation_rules
WHERE enabled = TRUE
ORDER BY priority DESC;
智能家居本地化数据处理系统留的数据接口:
POST /api/data/receive - 接收设备数据
POST /api/device/<id>/control - 控制设备
GET /api/devices - 获取设备列表
GET /api/sensor-data/latest - 获取最新传感器数据
GET /api/rules - 获取自动化规则
GET /api/statistics - 获取统计数据
发送传感器数据样例:
curl -X POST http://localhost:5001/api/data/receive \
-H "Content-Type: application/json" \
-d '{"device_id": "sensor_temp_livingroom", "value": 26.5, "data_type": "temperature", "unit": "°C"}'
控制设备样例:
curl -X POST http://localhost:5001/api/device/5/control \
-H "Content-Type: application/json" \
-d '{"command": "turn_on", "parameters": {"brightness": 80}}'
准备一些测试数据:
INSERT INTO devices (id, name, device_id, device_type, protocol, location, status, config) VALUES
(1, '客厅温度传感器', 'sensor_temp_livingroom', 'temperature', 'mqtt', '客厅', 'online', '{"topic": "sensors/livingroom/temp", "interval": 60}'),
(2, '客厅湿度传感器', 'sensor_humidity_livingroom', 'humidity', 'mqtt', '客厅', 'online', '{"topic": "sensors/livingroom/humidity", "interval": 60}'),
(3, '客厅人体传感器', 'sensor_motion_livingroom', 'motion', 'mqtt', '客厅', 'online', '{"topic": "sensors/livingroom/motion", "interval": 5}'),
(4, '客厅光照传感器', 'sensor_light_livingroom', 'light', 'mqtt', '客厅', 'online', '{"topic": "sensors/livingroom/light", "interval": 30}'),
(5, '客厅智能灯', 'device_light_livingroom', 'smart_light', 'mqtt', '客厅', 'online', '{"topic": "devices/livingroom/light", "supported_commands": ["turn_on", "turn_off", "set_brightness", "set_color"]}'),
(6, '卧室温度传感器', 'sensor_temp_bedroom', 'temperature', 'mqtt', '卧室', 'online', '{"topic": "sensors/bedroom/temp", "interval": 60}'),
(7, '卧室智能灯', 'device_light_bedroom', 'smart_light', 'mqtt', '卧室', 'online', '{"topic": "devices/bedroom/light", "supported_commands": ["turn_on", "turn_off", "set_brightness"]}'),
(8, '智能门锁', 'device_lock_entrance', 'smart_lock', 'mqtt', '玄关', 'online', '{"topic": "devices/entrance/lock", "supported_commands": ["lock", "unlock", "get_status"]}'),
(9, '智能窗帘', 'device_curtain_livingroom', 'smart_curtain', 'mqtt', '客厅', 'online', '{"topic": "devices/livingroom/curtain", "supported_commands": ["open", "close", "set_position"]}'),
(10, '智能空调', 'device_ac_livingroom', 'smart_ac', 'mqtt', '客厅', 'online', '{"topic": "devices/livingroom/ac", "supported_commands": ["turn_on", "turn_off", "set_temperature", "set_mode"]}');
-- ============================================
-- 插入示例自动化规则
-- ============================================
INSERT INTO automation_rules (id, name, description, enabled, priority, conditions, actions) VALUES
(1, '客厅有人自动开灯', '检测到客厅有人且光线低于阈值时自动开灯', TRUE, 10, '{"conditions": [{"device_id": "sensor_motion_livingroom", "type": "motion", "operator": "=", "value": true}, {"device_id": "sensor_light_livingroom", "type": "light", "operator": "<", "value": 100}], "logic": "AND"}', '{"actions": [{"device_id": "device_light_livingroom", "command": "turn_on", "parameters": {"brightness": 80}}]}'),
(2, '客厅无人自动关灯', '客厅无人超过 10 分钟自动关灯', TRUE, 5, '{"conditions": [{"device_id": "sensor_motion_livingroom", "type": "motion", "operator": "=", "value": false, "duration": 600}], "logic": "AND"}', '{"actions": [{"device_id": "device_light_livingroom", "command": "turn_off"}]}'),
(3, '温度过高自动开空调', '客厅温度超过 28 度自动开启空调', TRUE, 8, '{"conditions": [{"device_id": "sensor_temp_livingroom", "type": "temperature", "operator": ">", "value": 28}], "logic": "AND"}', '{"actions": [{"device_id": "device_ac_livingroom", "command": "turn_on", "parameters": {"temperature": 24, "mode": "cool"}}]}'),
(4, '离家模式', '离家时关闭所有灯光和空调', TRUE, 15, '{"conditions": [{"device_id": "device_lock_entrance", "type": "door_lock", "operator": "=", "value": "locked"}], "logic": "AND"}', '{"actions": [{"device_id": "device_light_livingroom", "command": "turn_off"}, {"device_id": "device_light_bedroom", "command": "turn_off"}, {"device_id": "device_ac_livingroom", "command": "turn_off"}]}');
整体项目结构其实挺复杂的,这里把核心代码贴上:
#!/usr/bin/env python3
"""
智能家居 Web 应用主程序
提供数据展示和控制界面
"""
import sys
import os
import json
import logging
from datetime import datetime, timedelta
from flask import Flask, render_template, jsonify, request
# 添加模块路径
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../data-processor'))
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../rule-engine'))
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../storage'))
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = Flask(__name__, template_folder='templates', static_folder='static')
# 模拟数据库连接
class MockDB:
"""模拟数据库连接"""
def execute_query(self, query, params=None):
"""执行查询"""
import subprocess
try:
cmd = ['docker', 'exec', 'kwdb-server', '/kaiwudb/bin/kwbase', 'sql', '--insecure', '--execute', query]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
lines = result.stdout.strip().split('\n')
if len(lines) > 1:
# 解析结果
headers = lines[0].split('\t')
data = []
for line in lines[1:]:
values = line.split('\t')
if len(values) == len(headers):
data.append(values)
return data
return []
except Exception as e:
logger.error(f"查询失败:{e}")
return []
db = MockDB()
@app.route('/')
def index():
"""首页"""
return render_template('index.html')
@app.route('/api/devices')
def get_devices():
"""获取设备列表"""
query = "SELECT id, name, device_id, device_type, protocol, location, status FROM devices ORDER BY id"
results = db.execute_query(query)
devices = []
if results and len(results) > 1:
headers = results[0]
for row in results[1:]:
device = {
'id': row[0],
'name': row[1],
'device_id': row[2],
'device_type': row[3],
'protocol': row[4],
'location': row[5],
'status': row[6]
}
devices.append(device)
return jsonify(devices)
@app.route('/api/devices/<int:device_id>/data')
def get_device_data(device_id):
"""获取设备数据"""
hours = request.args.get('hours', 24, type=int)
start_time = (datetime.now() - timedelta(hours=hours)).isoformat()
# 获取设备信息
device_query = f"SELECT device_id FROM devices WHERE id = {device_id}"
device_result = db.execute_query(device_query)
if not device_result or len(device_result) < 2:
return jsonify({'error': '设备不存在'}), 404
device_id_str = device_result[1][0]
# 获取传感器数据
data_query = f""" SELECT ts, value, data_type, status FROM sensor_data WHERE device_id = '{device_id_str}' AND ts >= '{start_time}' ORDER BY ts DESC LIMIT 100 """
results = db.execute_query(data_query)
data_points = []
if results and len(results) > 1:
for row in results[1:]:
data_points.append({
'timestamp': row[0],
'value': float(row[1]) if row[1] else None,
'data_type': row[2],
'status': row[3]
})
return jsonify({
'device_id': device_id,
'device_id_str': device_id_str,
'data': data_points
})
@app.route('/api/sensor-data/latest')
def get_latest_sensor_data():
"""获取最新传感器数据"""
query = """ SELECT d.id, d.name, d.device_type, d.location, d.status, sd.ts as last_update, sd.value, sd.data_type, sd.unit FROM devices d LEFT JOIN ( SELECT device_id, data_type, ts, value, unit, ROW_NUMBER() OVER (PARTITION BY device_id ORDER BY ts DESC) as rn FROM sensor_data ) sd ON d.device_id = sd.device_id AND sd.rn = 1 ORDER BY d.id """
results = db.execute_query(query)
sensor_data = []
if results and len(results) > 1:
for row in results[1:]:
sensor_data.append({
'id': row[0],
'name': row[1],
'device_type': row[2],
'location': row[3],
'status': row[4],
'last_update': row[5],
'value': float(row[6]) if row[6] else None,
'data_type': row[7],
'unit': row[8] if len(row) > 8 else None
})
return jsonify(sensor_data)
@app.route('/api/rules')
def get_rules():
"""获取自动化规则"""
query = "SELECT id, name, description, enabled, priority, last_triggered_at FROM automation_rules ORDER BY priority DESC"
results = db.execute_query(query)
rules = []
if results and len(results) > 1:
for row in results[1:]:
rules.append({
'id': row[0],
'name': row[1],
'description': row[2],
'enabled': row[3] == 't',
'priority': int(row[4]),
'last_triggered_at': row[5]
})
return jsonify(rules)
@app.route('/api/statistics')
def get_statistics():
"""获取统计数据"""
stats = {}
# 设备统计
device_query = "SELECT device_type, COUNT(*) as count, COUNT(CASE WHEN status = 'online' THEN 1 END) as online_count FROM devices GROUP BY device_type"
device_results = db.execute_query(device_query)
stats['devices'] = {}
if device_results and len(device_results) > 1:
for row in device_results[1:]:
stats['devices'][row[0]] = {
'total': int(row[1]),
'online': int(row[2])
}
# 传感器数据统计
data_query = "SELECT COUNT(*) as total_records FROM sensor_data"
data_results = db.execute_query(data_query)
if data_results and len(data_results) > 1:
stats['sensor_data'] = {
'total_records': int(data_results[1][0])
}
# 规则统计
rule_query = "SELECT COUNT(*) as total, COUNT(CASE WHEN enabled = TRUE THEN 1 END) as enabled FROM automation_rules"
rule_results = db.execute_query(rule_query)
if rule_results and len(rule_results) > 1:
stats['rules'] = {
'total': int(rule_results[1][0]),
'enabled': int(rule_results[1][1])
}
return jsonify(stats)
@app.route('/api/data/receive', methods=['POST'])
def receive_data():
"""接收设备数据"""
try:
data = request.json
# 验证数据
if not data or 'device_id' not in data or 'value' not in data:
return jsonify({'error': '缺少必要字段'}), 400
# 插入数据
device_id = data['device_id']
value = data['value']
data_type = data.get('data_type', 'value')
status = data.get('status', 'good')
unit = data.get('unit', '')
query = f""" INSERT INTO sensor_data (time, device_id, data_type, value, status, unit) VALUES (NOW(), '{device_id}', '{data_type}', {value}, '{status}', '{unit}') """
db.execute_query(query)
return jsonify({'success': True, 'message': '数据接收成功'})
except Exception as e:
logger.error(f"接收数据失败:{e}")
return jsonify({'error': str(e)}), 500
@app.route('/api/device/<int:device_id>/control', methods=['POST'])
def control_device(device_id):
"""控制设备"""
try:
data = request.json
if not data or 'command' not in data:
return jsonify({'error': '缺少命令参数'}), 400
command = data['command']
parameters = data.get('parameters', {})
# 获取设备信息
device_query = f"SELECT device_id, name FROM devices WHERE id = {device_id}"
device_result = db.execute_query(device_query)
if not device_result or len(device_result) < 2:
return jsonify({'error': '设备不存在'}), 404
device_id_str = device_result[1][0]
device_name = device_result[1][1]
# 记录控制指令
cmd_query = f""" INSERT INTO device_commands (time, device_id, command_type, parameters, status) VALUES (NOW(), '{device_id_str}', '{command}', '{json.dumps(parameters)}', 'success') """
db.execute_query(cmd_query)
logger.info(f"控制设备:{device_name} ({device_id_str}), 命令:{command}")
return jsonify({
'success': True,
'message': f'设备 {device_name} 控制指令已发送',
'command': command,
'parameters': parameters
})
except Exception as e:
logger.error(f"控制设备失败:{e}")
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5001, debug=True)
四、系统运行与功能测试
启动系统 Web API 服务,并通过接口测试与页面验证,确认系统所有功能正常运行。
1、启动系统 Web API 服务
在命令行输入启动命令并运行:
cd /root/workspace/smarthome-system/web-api && python3 simple_app.py > /tmp/smarthome-web-5002.log 2>&1 &
启动成功后,终端将显示:Running on http://[你的服务器 IP]:5002,即服务启动完成。
系统概览:设备总数、在线设备数、自动化规则数、数据记录数及设备列表。
实时数据:温湿度、光照、人体感应等传感器的最新数据与更新时间;自动化规则:所有规则的名称、描述、启用状态、优先级。
系统状态:KaiwuDB 版本、服务运行状态、接口文档。
2、测试接口
通过数据接口插入客厅传感器数据:
curl -X POST http://localhost:5002/api/data/receive -H "Content-Type: application/json" -d '{"device_id": "temperature", "value": 32, "data_type": "temperature", "unit": "°C"}'
将传感器实时数据上报至本地系统,数据自动存储至 KaiwuDB 时序表。
测试一下测试设备列表 API:
curl -s http://localhost:5002/api/devices
可以看到返回了列表数据。
五、总结
本文以 KaiwuDB 为数据基座,CodeArts 代码智能体为自动化开发引擎,实现了智能家居本地化数据处理系统的快速构建,全程无需复杂的手动编码,新手也可按步骤复现。随着 AIoT 技术的发展,本地化数据处理将成为智能家居的主流趋势,KaiwuDB 多模数据库让开发者实现'轻量、高效、安全'的智能场景落地。

