跳到主要内容 基于 KaiwuDB 与 CodeArts 智能体的智能家居本地化数据处理方案 | 极客日志
Python AI 算法
基于 KaiwuDB 与 CodeArts 智能体的智能家居本地化数据处理方案 针对智能家居云端数据处理的延迟与隐私问题,介绍基于 KaiwuDB 多模时序数据库与 CodeArts 代码智能体的本地化解决方案。通过 Docker 自动化部署数据库,利用 AI 辅助生成设备接入、数据处理及规则引擎模块代码,实现零云端依赖、低延迟且高隐私的智能家居系统。涵盖环境配置、数据库设计、API 开发及测试全流程,降低技术门槛,适配新手开发者。
云间运维 发布于 2026/4/5 更新于 2026/4/13 1 浏览一、环境准备
1.1、配置 VSCode 远程链接服务器
所有操作在 VSCode 下完成。
1.1.1、首先在 VSCode 下载安装 Remote-SSH 扩展,打开 VSCode,在扩展里搜索 remote-ssh,然后点击安装。
1.1.2、链接远程服务器
点击 remote-ssh 扩展,点击加号,然后按着提示输入:ssh root@[服务器 IP],这里服务器替换成自己的 IP 与用户,然后回车确认。
选择 SSH 信息存放的配置文件。
至此配置完成,点击登录。
因为我们是 Linux 的服务器,这里选择 Linux。
输入密码然后回车完成。
至此链接远程服务器成功。
1.2、准备 CodeArts 代码智能体 CodeArts 代码智能体是基于智能生成、智能问答两大核心能力构建起一套全方位、多层次的智能开发体系。在智能生成方面,它能够依据开发者输入的需求描述,准确且高效地生成高质量代码。
1.2.1、安装 CodeArts 扩展
在 VSCode 下载安装 CodeArts 代码智能体,在扩展里搜索 CodeArts,然后点击安装。
1.2.2、CodeArts 登录
安装完成后点击 CodeArts 图标,然后点击登录。
然后登录华为云账号进行登录。
登录成功后如下。
在 CodeArts 代码智能体对话框下侧点 Agent 模式,在弹出的菜单最后侧点设置按钮,进入授权所有自动化操作界面,分别勾选读取文件和目录 、编辑文件 、执行命令 、更新代办、执行 task 工具、使用浏览器 。
至此 CodeArts 代码智能体已经准备完成了,下面全部交给 AI 帮我们部署数据库,开发部署应用。
二、数据库自动部署 借助 CodeArts 代码智能体的自动化能力,通过自然语言指令即可完成 KaiwuDB 的 Docker 化部署,全程无需手动编写部署脚本。
在 CodeArts 代码智能体里输入提示词:帮我使用 docker 部署一个最新版的 kwdb,然后等待数据库部署。
等待十几分钟后我们来看成果,可以看到数据库已经部署成功了。
docker exec -it kwdb-server bash
连接到 kwdb 的 SQL 客户端查看下数据库版本,及数据的基础功能是否正常,若终端显示 KaiwuDB 版本号(如 3.1.0),且容器状态为「Up」,则说明部署成功。
docker exec -it kwdb-server /kaiwudb/bin/kwbase sql --insecure
部署后可通过以下命令管理 KWDB 容器,方便后续运维:
docker ps --filter "name=kwdb-server"
docker logs -f kwdb-server
docker exec -it kwdb-server bash
docker exec -it kwdb-server /kaiwudb/bin/kwbase sql --insecure
docker-compose down
docker-compose up -d
三、智能家居本地化数据处理系统开发
3.1 发送开发指令,启动自动化构建 基于已部署的 KaiwuDB,通过 CodeArts 智能体指令驱动,自动完成智能家居本地化数据处理系统的全模块开发,包括设备接入、数据处理、规则引擎、本地存储、Web API 等核心模块。
先构造好提示词如下,在 CodeArts 智能体模式下,输入以下 Prompt 指令,引导智能体完成应用开发:
使用上面部署的 kwdb 多模时序数据库请为我设计一套「智能家居本地化数据处理系统」,核心目标是:
1. 所有设备数据(如传感器数据、设备控制指令、用户行为数据)均在本地处理,不上传至云端,保障用户隐私;
2. 系统具备低延迟、高稳定性,支持断网情况下智能家居设备正常联动;
3. 适配常见智能家居设备(如温湿度传感器、人体红外传感器、智能灯、智能窗帘、智能空调、智能门锁);
4. 支持简单的自动化规则自定义(如'当检测到有人且光线低于阈值时自动开灯' )。
5. 核心功能模块设计:
- 设备接入模块:如何适配不同协议的设备,实现数据采集;
- 本地化数据处理模块:实时数据解析、清洗、转换的逻辑;
- 规则引擎模块:用户自定义自动化规则的存储与执行逻辑;
- 本地存储模块:数据存储策略(如哪些数据需要时序表、哪些数据持久化关系表、存储周期);
此时可以看到,智能体开始自动构建任务,生成开发方案、编写代码。
3.2、系统整体架构 系统采用分层架构设计,所有模块均基于本地运行,数据全量存储在 KaiwuDB,实现零云端依赖。
┌─────────────────────────────────────────────────────────────┐
│ 智能家居本地化数据处理系统 │
├─────────────────────────────────────────────────────────────┤
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ 设备接入层 │ │ 数据处理层 │ │ 规则引擎层 │ │
│ │ - MQTT 协议 │ │ - 数据解析 │ │ - 规则存储 │ │
│ │ - HTTP 协议 │ │ - 数据清洗 │ │ - 规则匹配 │ │
│ │ - CoAP 协议 │ │ - 数据转换 │ │ - 规则执行 │ │
│ │ - Zigbee 网关 │ │ - 数据验证 │ │ - 动作触发 │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │ │ │ │
│ └──────────────────┼──────────────────┘ │
│ │ │
│ ┌────────▼────────┐ │
│ │ KaiwuDB 数据库 │ │
│ │ - 时序数据表 │ │
│ │ - 设备信息表 │ │
│ │ - 规则配置表 │ │
│ │ - 事件日志表 │ │
│ └─────────────────┘ │
└─────────────────────────────────────────────────────────────┘
3.3、项目结构 CodeArts 智能体将生成规范的项目目录,所有模块解耦设计,便于后续扩展与维护,核心项目结构如下:
smarthome-system/
├── README.md
├── docker-compose.yml
├── database/
│ ├── schema .sql
│ └── sample_data.sql
├── device-adapter/
│ ├── mqtt_adapter.py
│ ├── http_adapter.py
│ ├── coap_adapter.py
│ └── zigbee_adapter.py
├── data-processor/
│ ├── parser.py
│ ├── cleaner.py
│ ├── transformer.py
│ └── validator.py
├── rule-engine/
│ ├── rule_manager.py
│ ├── rule_executor.py
│ ├── condition_matcher.py
│ └── action_trigger.py
├── storage/
│ ├── timeseries_store.py
│ ├── relational_store.py
│ └── data_retention.py
├── web-api/
│ ├── app.py
│ ├── device_api.py
│ ├── rule_api.py
│ └── data_api.py
└── scripts/
├── init_system.py
└── test_system.py
3.4、KaiwuDB 核心库表设计
CREATE TS DATABASE IF NOT EXISTS tsdb;
USE tsdb;
CREATE TABLE IF NOT EXISTS home_sensor (
ts TIMESTAMP NOT NULL ,
value FLOAT ,
status VARCHAR (32 ),
file_path VARCHAR (128 )
) ATTRIBUTES (
device_id VARCHAR (64 ) NOT NULL ,
data_type VARCHAR (32 ) NOT NULL ,
location VARCHAR (32 ),
device_model VARCHAR (64 )
) PRIMARY TAGS(device_id)
RETENTIONS 20 D ACTIVETIME 12 h;
CREATE TABLE IF NOT EXISTS device_commands (
ts TIMESTAMP NOT NULL ,
command_type VARCHAR (64 ),
parameters varchar (64 ),
status VARCHAR (32 ),
executed_at TIMESTAMP ,
error_message VARCHAR (256 )
) ATTRIBUTES (
device_id VARCHAR (64 ) NOT NULL ,
command_id VARCHAR (128 ),
user_id VARCHAR (64 ),
source VARCHAR (32 )
) PRIMARY TAGS(device_id)
RETENTIONS 30 D ACTIVETIME 1 h;
CREATE TABLE IF NOT EXISTS rule_execution (
ts TIMESTAMP NOT NULL ,
trigger_event VARCHAR (32 ),
conditions_matched VARCHAR (32 ),
actions_executed VARCHAR (32 ),
status VARCHAR (32 ),
execution_time_ms INT ,
error_message VARCHAR (256 )
) ATTRIBUTES (
rule_id INT NOT NULL ,
rule_name VARCHAR (128 ) NOT NULL ,
priority INT ,
triggered_by VARCHAR (64 )
) PRIMARY TAGS(rule_id)
RETENTIONS 60 D ACTIVETIME 6 h;
CREATE TABLE IF NOT EXISTS system_events (
ts TIMESTAMP NOT NULL ,
event_type VARCHAR (64 ),
event_level VARCHAR (32 ),
message VARCHAR (512 ),
details VARCHAR (64 )
) ATTRIBUTES (
source VARCHAR (64 ) NOT NULL ,
device_id VARCHAR (64 )
) PRIMARY TAGS(source)
RETENTIONS 30 D ACTIVETIME 1 h;
CREATE TABLE IF NOT EXISTS user_activity (
ts TIMESTAMP NOT NULL ,
action_type VARCHAR (64 ),
device_id VARCHAR (64 ),
action_details VARCHAR (64 )
) ATTRIBUTES (
user_id VARCHAR (64 ) NOT NULL ,
session_id VARCHAR (128 ),
ip_address VARCHAR (64 ),
user_agent VARCHAR (256 )
) PRIMARY TAGS(user_id)
RETENTIONS 90 D ACTIVETIME 24 h;
USE defaultdb;
CREATE TABLE IF NOT EXISTS devices (
id INT PRIMARY KEY ,
name STRING NOT NULL ,
device_id STRING NOT NULL UNIQUE ,
device_type STRING NOT NULL ,
protocol STRING NOT NULL ,
location STRING,
status STRING DEFAULT 'online' ,
config JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_devices_device_id ON devices(device_id);
CREATE INDEX IF NOT EXISTS idx_devices_type ON devices(device_type);
CREATE INDEX IF NOT EXISTS idx_devices_location ON devices(location);
CREATE INDEX IF NOT EXISTS idx_devices_status ON devices(status);
CREATE TABLE IF NOT EXISTS automation_rules (
id INT PRIMARY KEY ,
name STRING NOT NULL ,
description STRING,
enabled BOOL DEFAULT TRUE ,
priority INT DEFAULT 0 ,
conditions JSONB NOT NULL ,
actions JSONB NOT NULL ,
created_by STRING DEFAULT 'system' ,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ,
last_triggered_at TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_rules_enabled ON automation_rules(enabled);
CREATE INDEX IF NOT EXISTS idx_rules_priority ON automation_rules(priority DESC );
CREATE TABLE IF NOT EXISTS data_statistics (
id INT PRIMARY KEY ,
device_id STRING NOT NULL ,
data_type STRING NOT NULL ,
stat_type STRING NOT NULL ,
time_window STRING NOT NULL ,
stat_value DECIMAL (20 ,6 ),
stat_time TIMESTAMP NOT NULL ,
calculated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_data_stats_device ON data_statistics(device_id, data_type, stat_type, time_window, stat_time);
CREATE TABLE IF NOT EXISTS device_snapshots (
id INT PRIMARY KEY ,
device_id STRING NOT NULL ,
snapshot_time TIMESTAMP NOT NULL ,
current_state JSONB,
battery_level INT ,
signal_strength INT ,
last_seen TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_device_snapshots_device ON device_snapshots(device_id, snapshot_time DESC );
CREATE SEQUENCE IF NOT EXISTS devices_seq START 1 ;
CREATE SEQUENCE IF NOT EXISTS automation_rules_seq START 1 ;
CREATE SEQUENCE IF NOT EXISTS data_statistics_seq START 1 ;
CREATE SEQUENCE IF NOT EXISTS device_snapshots_seq START 1 ;
CREATE VIEW IF NOT EXISTS v_device_latest_status AS
SELECT d.id, d.name, d.device_id, d.device_type, d.location, d.status, hs.ts as last_update, hs.value, hs.data_type, hs.status as sensor_status
FROM devices d
LEFT JOIN (
SELECT device_id, data_type, ts, value , status, ROW_NUMBER () OVER (PARTITION BY device_id ORDER BY ts DESC ) as rn
FROM tsdb.home_sensor
) hs ON d.device_id = hs.device_id AND hs.rn = 1 ;
CREATE VIEW IF NOT EXISTS v_active_rules AS
SELECT id, name, description, priority, conditions, actions, last_triggered_at
FROM automation_rules
WHERE enabled = TRUE
ORDER BY priority DESC ;
POST /api/data/receive - 接收设备数据
POST /api/device/<id >/control - 控制设备
GET /api/devices - 获取设备列表
GET /api/sensor-data/latest - 获取最新传感器数据
GET /api/rules - 获取自动化规则
GET /api/statistics - 获取统计数据
curl -X POST http://localhost:5001/api/data/receive \
-H "Content-Type: application/json" \
-d '{"device_id": "sensor_temp_livingroom", "value": 26.5, "data_type": "temperature", "unit": "°C"}'
curl -X POST http://localhost:5001/api/device/5/control \
-H "Content-Type: application/json" \
-d '{"command": "turn_on", "parameters": {"brightness": 80}}'
INSERT INTO devices (id, name, device_id, device_type, protocol, location, status, config) VALUES
(1 , '客厅温度传感器' , 'sensor_temp_livingroom' , 'temperature' , 'mqtt' , '客厅' , 'online' , '{"topic": "sensors/livingroom/temp", "interval": 60}' ),
(2 , '客厅湿度传感器' , 'sensor_humidity_livingroom' , 'humidity' , 'mqtt' , '客厅' , 'online' , '{"topic": "sensors/livingroom/humidity", "interval": 60}' ),
(3 , '客厅人体传感器' , 'sensor_motion_livingroom' , 'motion' , 'mqtt' , '客厅' , 'online' , '{"topic": "sensors/livingroom/motion", "interval": 5}' ),
(4 , '客厅光照传感器' , 'sensor_light_livingroom' , 'light' , 'mqtt' , '客厅' , 'online' , '{"topic": "sensors/livingroom/light", "interval": 30}' ),
(5 , '客厅智能灯' , 'device_light_livingroom' , 'smart_light' , 'mqtt' , '客厅' , 'online' , '{"topic": "devices/livingroom/light", "supported_commands": ["turn_on", "turn_off", "set_brightness", "set_color"]}' ),
(6 , '卧室温度传感器' , 'sensor_temp_bedroom' , 'temperature' , 'mqtt' , '卧室' , 'online' , '{"topic": "sensors/bedroom/temp", "interval": 60}' ),
(7 , '卧室智能灯' , 'device_light_bedroom' , 'smart_light' , 'mqtt' , '卧室' , 'online' , '{"topic": "devices/bedroom/light", "supported_commands": ["turn_on", "turn_off", "set_brightness"]}' ),
(8 , '智能门锁' , 'device_lock_entrance' , 'smart_lock' , 'mqtt' , '玄关' , 'online' , '{"topic": "devices/entrance/lock", "supported_commands": ["lock", "unlock", "get_status"]}' ),
(9 , '智能窗帘' , 'device_curtain_livingroom' , 'smart_curtain' , 'mqtt' , '客厅' , 'online' , '{"topic": "devices/livingroom/curtain", "supported_commands": ["open", "close", "set_position"]}' ),
(10 , '智能空调' , 'device_ac_livingroom' , 'smart_ac' , 'mqtt' , '客厅' , 'online' , '{"topic": "devices/livingroom/ac", "supported_commands": ["turn_on", "turn_off", "set_temperature", "set_mode"]}' );
INSERT INTO automation_rules (id, name, description, enabled, priority, conditions, actions) VALUES
(1 , '客厅有人自动开灯' , '检测到客厅有人且光线低于阈值时自动开灯' , TRUE , 10 , '{"conditions": [{"device_id": "sensor_motion_livingroom", "type": "motion", "operator": "=", "value": true}, {"device_id": "sensor_light_livingroom", "type": "light", "operator": "<", "value": 100}], "logic": "AND"}' , '{"actions": [{"device_id": "device_light_livingroom", "command": "turn_on", "parameters": {"brightness": 80}}]}' ),
(2 , '客厅无人自动关灯' , '客厅无人超过 10 分钟自动关灯' , TRUE , 5 , '{"conditions": [{"device_id": "sensor_motion_livingroom", "type": "motion", "operator": "=", "value": false, "duration": 600}], "logic": "AND"}' , '{"actions": [{"device_id": "device_light_livingroom", "command": "turn_off"}]}' ),
(3 , '温度过高自动开空调' , '客厅温度超过 28 度自动开启空调' , TRUE , 8 , '{"conditions": [{"device_id": "sensor_temp_livingroom", "type": "temperature", "operator": ">", "value": 28}], "logic": "AND"}' , '{"actions": [{"device_id": "device_ac_livingroom", "command": "turn_on", "parameters": {"temperature": 24, "mode": "cool"}}]}' ),
(4 , '离家模式' , '离家时关闭所有灯光和空调' , TRUE , 15 , '{"conditions": [{"device_id": "device_lock_entrance", "type": "door_lock", "operator": "=", "value": "locked"}], "logic": "AND"}' , '{"actions": [{"device_id": "device_light_livingroom", "command": "turn_off"}, {"device_id": "device_light_bedroom", "command": "turn_off"}, {"device_id": "device_ac_livingroom", "command": "turn_off"}]}' );
"""
智能家居 Web 应用主程序
提供数据展示和控制界面
"""
import sys
import os
import json
import logging
from datetime import datetime, timedelta
from flask import Flask, render_template, jsonify, request
sys.path.insert(0 , os.path.join(os.path.dirname(__file__), '../data-processor' ))
sys.path.insert(0 , os.path.join(os.path.dirname(__file__), '../rule-engine' ))
sys.path.insert(0 , os.path.join(os.path.dirname(__file__), '../storage' ))
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = Flask(__name__, template_folder='templates' , static_folder='static' )
class MockDB :
"""模拟数据库连接"""
def execute_query (self, query, params=None ):
"""执行查询"""
import subprocess
try :
cmd = ['docker' , 'exec' , 'kwdb-server' , '/kaiwudb/bin/kwbase' , 'sql' , '--insecure' , '--execute' , query]
result = subprocess.run(cmd, capture_output=True , text=True )
if result.returncode == 0 :
lines = result.stdout.strip().split('\n' )
if len (lines) > 1 :
headers = lines[0 ].split('\t' )
data = []
for line in lines[1 :]:
values = line.split('\t' )
if len (values) == len (headers):
data.append(values)
return data
return []
except Exception as e:
logger.error(f"查询失败:{e} " )
return []
db = MockDB()
@app.route('/' )
def index ():
"""首页"""
return render_template('index.html' )
@app.route('/api/devices' )
def get_devices ():
"""获取设备列表"""
query = "SELECT id, name, device_id, device_type, protocol, location, status FROM devices ORDER BY id"
results = db.execute_query(query)
devices = []
if results and len (results) > 1 :
headers = results[0 ]
for row in results[1 :]:
device = {
'id' : row[0 ],
'name' : row[1 ],
'device_id' : row[2 ],
'device_type' : row[3 ],
'protocol' : row[4 ],
'location' : row[5 ],
'status' : row[6 ]
}
devices.append(device)
return jsonify(devices)
@app.route('/api/devices/<int:device_id>/data' )
def get_device_data (device_id ):
"""获取设备数据"""
hours = request.args.get('hours' , 24 , type =int )
start_time = (datetime.now() - timedelta(hours=hours)).isoformat()
device_query = f"SELECT device_id FROM devices WHERE id = {device_id} "
device_result = db.execute_query(device_query)
if not device_result or len (device_result) < 2 :
return jsonify({'error' : '设备不存在' }), 404
device_id_str = device_result[1 ][0 ]
data_query = f""" SELECT ts, value, data_type, status FROM sensor_data WHERE device_id = '{device_id_str} ' AND ts >= '{start_time} ' ORDER BY ts DESC LIMIT 100 """
results = db.execute_query(data_query)
data_points = []
if results and len (results) > 1 :
for row in results[1 :]:
data_points.append({
'timestamp' : row[0 ],
'value' : float (row[1 ]) if row[1 ] else None ,
'data_type' : row[2 ],
'status' : row[3 ]
})
return jsonify({
'device_id' : device_id,
'device_id_str' : device_id_str,
'data' : data_points
})
@app.route('/api/sensor-data/latest' )
def get_latest_sensor_data ():
"""获取最新传感器数据"""
query = """ SELECT d.id, d.name, d.device_type, d.location, d.status, sd.ts as last_update, sd.value, sd.data_type, sd.unit FROM devices d LEFT JOIN ( SELECT device_id, data_type, ts, value, unit, ROW_NUMBER() OVER (PARTITION BY device_id ORDER BY ts DESC) as rn FROM sensor_data ) sd ON d.device_id = sd.device_id AND sd.rn = 1 ORDER BY d.id """
results = db.execute_query(query)
sensor_data = []
if results and len (results) > 1 :
for row in results[1 :]:
sensor_data.append({
'id' : row[0 ],
'name' : row[1 ],
'device_type' : row[2 ],
'location' : row[3 ],
'status' : row[4 ],
'last_update' : row[5 ],
'value' : float (row[6 ]) if row[6 ] else None ,
'data_type' : row[7 ],
'unit' : row[8 ] if len (row) > 8 else None
})
return jsonify(sensor_data)
@app.route('/api/rules' )
def get_rules ():
"""获取自动化规则"""
query = "SELECT id, name, description, enabled, priority, last_triggered_at FROM automation_rules ORDER BY priority DESC"
results = db.execute_query(query)
rules = []
if results and len (results) > 1 :
for row in results[1 :]:
rules.append({
'id' : row[0 ],
'name' : row[1 ],
'description' : row[2 ],
'enabled' : row[3 ] == 't' ,
'priority' : int (row[4 ]),
'last_triggered_at' : row[5 ]
})
return jsonify(rules)
@app.route('/api/statistics' )
def get_statistics ():
"""获取统计数据"""
stats = {}
device_query = "SELECT device_type, COUNT(*) as count, COUNT(CASE WHEN status = 'online' THEN 1 END) as online_count FROM devices GROUP BY device_type"
device_results = db.execute_query(device_query)
stats['devices' ] = {}
if device_results and len (device_results) > 1 :
for row in device_results[1 :]:
stats['devices' ][row[0 ]] = {
'total' : int (row[1 ]),
'online' : int (row[2 ])
}
data_query = "SELECT COUNT(*) as total_records FROM sensor_data"
data_results = db.execute_query(data_query)
if data_results and len (data_results) > 1 :
stats['sensor_data' ] = {
'total_records' : int (data_results[1 ][0 ])
}
rule_query = "SELECT COUNT(*) as total, COUNT(CASE WHEN enabled = TRUE THEN 1 END) as enabled FROM automation_rules"
rule_results = db.execute_query(rule_query)
if rule_results and len (rule_results) > 1 :
stats['rules' ] = {
'total' : int (rule_results[1 ][0 ]),
'enabled' : int (rule_results[1 ][1 ])
}
return jsonify(stats)
@app.route('/api/data/receive' , methods=['POST' ] )
def receive_data ():
"""接收设备数据"""
try :
data = request.json
if not data or 'device_id' not in data or 'value' not in data:
return jsonify({'error' : '缺少必要字段' }), 400
device_id = data['device_id' ]
value = data['value' ]
data_type = data.get('data_type' , 'value' )
status = data.get('status' , 'good' )
unit = data.get('unit' , '' )
query = f""" INSERT INTO sensor_data (time, device_id, data_type, value, status, unit) VALUES (NOW(), '{device_id} ', '{data_type} ', {value} , '{status} ', '{unit} ') """
db.execute_query(query)
return jsonify({'success' : True , 'message' : '数据接收成功' })
except Exception as e:
logger.error(f"接收数据失败:{e} " )
return jsonify({'error' : str (e)}), 500
@app.route('/api/device/<int:device_id>/control' , methods=['POST' ] )
def control_device (device_id ):
"""控制设备"""
try :
data = request.json
if not data or 'command' not in data:
return jsonify({'error' : '缺少命令参数' }), 400
command = data['command' ]
parameters = data.get('parameters' , {})
device_query = f"SELECT device_id, name FROM devices WHERE id = {device_id} "
device_result = db.execute_query(device_query)
if not device_result or len (device_result) < 2 :
return jsonify({'error' : '设备不存在' }), 404
device_id_str = device_result[1 ][0 ]
device_name = device_result[1 ][1 ]
cmd_query = f""" INSERT INTO device_commands (time, device_id, command_type, parameters, status) VALUES (NOW(), '{device_id_str} ', '{command} ', '{json.dumps(parameters)} ', 'success') """
db.execute_query(cmd_query)
logger.info(f"控制设备:{device_name} ({device_id_str} ), 命令:{command} " )
return jsonify({
'success' : True ,
'message' : f'设备 {device_name} 控制指令已发送' ,
'command' : command,
'parameters' : parameters
})
except Exception as e:
logger.error(f"控制设备失败:{e} " )
return jsonify({'error' : str (e)}), 500
if __name__ == '__main__' :
app.run(host='0.0.0.0' , port=5001 , debug=True )
四、系统运行与功能测试 启动系统 Web API 服务,并通过接口测试 与页面验证 ,确认系统所有功能正常运行。
1、启动系统 Web API 服务 cd /root/workspace/smarthome-system/web-api && python3 simple_app.py > /tmp/smarthome-web-5002.log 2>&1 &
启动成功后,终端将显示:Running on http://[你的服务器 IP]:5002,即服务启动完成。
系统概览:设备总数、在线设备数、自动化规则数、数据记录数及设备列表。
实时数据:温湿度、光照、人体感应等传感器的最新数据与更新时间;自动化规则:所有规则的名称、描述、启用状态、优先级。
系统状态:KaiwuDB 版本、服务运行状态、接口文档。
2、测试接口 curl -X POST http://localhost:5002/api/data/receive -H "Content-Type: application/json" -d '{"device_id": "temperature", "value": 32, "data_type": "temperature", "unit": "°C"}'
将传感器实时数据上报至本地系统,数据自动存储至 KaiwuDB 时序表。
curl -s http://localhost:5002/api/devices
五、总结 本文以 KaiwuDB 为数据基座,CodeArts 代码智能体为自动化开发引擎,实现了智能家居本地化数据处理系统的快速构建,全程无需复杂的手动编码,新手也可按步骤复现。随着 AIoT 技术的发展,本地化数据处理将成为智能家居的主流趋势,KaiwuDB 多模数据库让开发者实现'轻量、高效、安全'的智能场景落地。
微信扫一扫,关注极客日志 微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
相关免费在线工具 加密/解密文本 使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
RSA密钥对生成器 生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
Mermaid 预览与可视化编辑 基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
curl 转代码 解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
Base64 字符串编码/解码 将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
Base64 文件转换器 将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online