跳到主要内容
KaiwuDB+CodeArts 智能体,让ai快速构建一个智能家居本地化数据处理系统 | 极客日志
Python
KaiwuDB+CodeArts 智能体,让ai快速构建一个智能家居本地化数据处理系统 针对智能家居云端数据处理模式的网络依赖、低延迟性差、隐私泄露三大痛点,基于 KaiwuDB(KWDB)多模时序数据库 + 华为 CodeArts 代码智能体的本地化数据处理解决方案。从环境搭建、KWDB 自动化部署,到系统全模块开发、接口测试实现全流程落地,打造零云端依赖、低延迟、高隐私的智能家居本地化数据处理系统,方案基于开源技术栈与自动化开发工具,降低技术门槛,适配新手开发者与实际家庭场景需求…
雪落无声 发布于 2026/4/7 更新于 2026/5/26 54K 浏览针对智能家居云端数据处理模式的网络依赖、低延迟性差、隐私泄露三大痛点,基于 KaiwuDB(KWDB)多模时序数据库 + 华为 CodeArts 代码智能体的本地化数据处理解决方案。从环境搭建、KWDB 自动化部署,到系统全模块开发、接口测试实现全流程落地,打造零云端依赖、低延迟、高隐私的智能家居本地化数据处理系统,方案基于开源技术栈与自动化开发工具,降低技术门槛,适配新手开发者与实际家庭场景需求。
随着智能家居设备渗透率持续提升,家庭中温湿度传感器、智能灯、空调、门锁等设备呈规模化增长,设备运行产生的时序数据 (温湿度、能耗、设备状态)与关系型数据 (设备信息、规则配置)呈爆发式增长,对数据的存储、处理与利用提出更高要求。
本文选择KaiwuDB 作为本地化数据存储与计算核心,华为 CodeArts 代码智能体 作为自动化研发引擎,二者结合实现智能家居本地化数据处理系统的高效构建,核心优势如下:
1.1 KaiwuDB:适配 AIoT 场景的多模数据库基座
KaiwuDB(开源版本简称 KWDB)是开放原子开源基金会孵化的分布式多模数据库,专为 AIoT 场景设计,核心特性完美匹配智能家居本地化需求:
多模融合 :支持时序库与关系库统一实例管理,无需多库部署,一站式处理智能家居时序数据与关系型数据,简化架构与运维成本;
极致性能 :就地计算、智能预计算技术实现百万级数据秒级写入、亿级数据秒级读取,单机支持 1 秒 20 亿记录数据探索,满足设备高频采集与低延迟查询需求;
本地高可靠 :支持离线运行、断网可用,数据全程本地留存,从根源保障隐私;特有压缩算法节省 90% 时序数据存储空间,降低本地硬件成本;
高兼容性 :支持标准 SQL 语法与主流数据库工具,协议插件框架可快速适配 MQTT、Modbus 等智能家居设备协议,扩展性强。
1.2 华为 CodeArts 代码智能体:自动化研发效率引擎
华为云 CodeArts 代码智能体融合 AI 大模型与 IDE 工具,实现自然语言到代码的全流程转化,核心能力如下:
自然语言转开发 :将业务需求直接转化为结构化开发步骤,自动完成代码生成、配置编写、接口开发,非专业开发者也可参与系统构建;
环境自动适配 :自主调用 Shell 命令、代码搜索等工具,完成 KaiwuDB 部署、表结构设计、硬件环境适配,大幅减少手动操作;
代码质量保障 :内置代码规范检查、漏洞扫描、单元测试生成,确保代码符合工业级标准,支持多人协同与快速迭代;
全流程自动化 :从数据库部署到系统开发、测试,全程通过自然语言指令驱动,大幅缩短研发周期。
下面将详细介绍系统搭建的完整流程,从环境准备到系统测试一步到位
一、环境准备
1.1、配置vscode远程链接服务器
CodeArts代码智能体自动部署kaiwudb与自动化构建智能家居本地化数据处理系统所有操作在vscode下完成
1.1.1、首先在vscode下载安装 remote-ssh扩展,打开vscode,在扩展里搜索remote-ssh,然后点击安装
1.1.2、链接远程服务器
点击remote-ssh扩展,点击加号,然后按着提示输入:ssh [email protected] ,这里服务器替换成自己的ip与用户,然后回车确认
2.2、准备CodeArts代码智能体 CodeArts代码智能体是基于智能生成、智能问答两大核心能力构建起一套全方位、多层次的智能开发体系。在智能生成方面,它能够依据开发者输入的需求描述,准确且高效地生成高质量代码;
在vscode下载安装 CodeArts代码智能体,在扩展里搜索CodeArts,然后点击安装
在CodeArts代码智能体对话框下侧点Agent模式,在弹出的菜单最后侧点设置按钮,进入授权所有自动化操作界面,分别勾选读取文件和目录 、编辑文件 、执行命令 、更新代办、执行task工具、使用浏览器 。
至此CodeArts代码智能体已经准备完成了,下面全部交给ai帮我们部署数据库,开发部署应用
二、数据库自动部署 借助 CodeArts 代码智能体的自动化能力,通过自然语言指令即可完成 KaiwuDB 的 Docker 化部署,全程无需手动编写部署脚本
在CodeArts代码智能体里输入提示词:帮我使用docker部署一个最新版的kwdb,然后等待数据库部署
等待十几分钟后我们来看成果,可以看到数据库已经部署成功了
docker exec -it kwdb-server bash
连接到kwdb的SQL客户端查看下数据库版本,及数据的基础功能是否正常,若终端显示 KaiwuDB 版本号(如 3.1.0),且容器状态为「Up」,则说明部署成功。
docker exec - it kwdb- server / kaiwudb/ bin/ kwbase sql
部署后可通过以下命令管理 KWDB 容器,方便后续运维
# 查看容器状态 docker ps --filter "name=kwdb-server"
三、智能家居本地化数据处理系统开发
3.1 发送开发指令,启动自动化构建 基于已部署的 KaiwuDB,通过 CodeArts 智能体指令驱动,自动完成智能家居本地化数据处理系统的全模块开发,包括设备接入、数据处理、规则引擎、本地存储、Web API 等核心模块。
先构造好提示词如下,在 CodeArts 智能体模式下,输入以下 Prompt 指令,引导智能体完成应用开发::
使用上面部署的kwdb多模时序数据库请为我设计一套「智能家居本地化数据处理系统」,核心目标是: 1. 所有设备数据(如传感器数据、设备控制指令、用户行为数据)均在本地处理,不上传至云端,保障用户隐私; 2. 系统具备低延迟、高稳定性,支持断网情况下智能家居设备正常联动; 3. 适配常见智能家居设备(如温湿度传感器、人体红外传感器、智能灯、智能窗帘、智能空调、智能门锁); 4. 支持简单的自动化规则自定义(如'当检测到有人且光线低于阈值时自动开灯' )。 5. 核心功能模块设计: - 设备接入模块:如何适配不同协议的设备,实现数据采集; - 本地化数据处理模块:实时数据解析、清洗、转换的逻辑; - 规则引擎模块:用户自定义自动化规则的存储与执行逻辑; - 本地存储模块:数据存储策略(如哪些数据需要时序表、哪些数据持久化关系表、存储周期);
此时可以看到,智能体开始自动构建任务,生成开发方案、编写代码。
3.2、系统整体架构 系统采用分层架构设计,所有模块均基于本地运行,数据全量存储在 KaiwuDB,实现零云端依赖
┌─────────────────────────────────────────────────────────────┐ │ 智能家居本地化数据处理系统 │ ├─────────────────────────────────────────────────────────────┤ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │ │ 设备接入层 │ │ 数据处理层 │ │ 规则引擎层 │ │ │ │ - MQTT协议 │ │ - 数据解析 │ │ - 规则存储 │ │ │ │ - HTTP协议 │ │ - 数据清洗 │ │ - 规则匹配 │ │ │ │ - CoAP协议 │ │ - 数据转换 │ │ - 规则执行 │ │ │ │ - Zigbee网关 │ │ - 数据验证 │ │ - 动作触发 │ │ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │ │ │ │ │ │ └──────────────────┼──────────────────┘ │ │ │ │ │ ┌────────▼────────┐ │ │ │ KaiwuDB数据库 │ │ │ │ - 时序数据表 │ │ │ │ - 设备信息表 │ │ │ │ - 规则配置表 │ │ │ │ - 事件日志表 │ │ │ └─────────────────┘ │ └─────────────────────────────────────────────────────────────┘
3.3、项目结构 CodeArts 智能体将生成规范的项目目录,所有模块解耦设计,便于后续扩展与维护,核心项目结构如下:
smarthome-system / ├── README.md ├── docker-compose.yml
3.4、KaiwuDB 核心库表设计 -- 创建时序数据库 CREATE TS DATABASE IF NOT EXISTS tsdb; USE tsdb; -- ============================================ -- 时序表设计 -- ============================================ -- 1 . 传感器时序数据表 CREATE TABLE IF NOT EXISTS home_sensor ( ts TIMESTAMP NOT NULL, value FLOAT, status VARCHAR(32 ), file_path VARCHAR (128 ) ) ATTRIBUTES ( device_id VARCHAR(64 ) NOT NULL, data_type VARCHAR (32 ) NOT NULL, location VARCHAR (32 ), device_model VARCHAR (64 ) ) PRIMARY TAGS (device_id) RETENTIONS 20 D ACTIVETIME 12 h; -- 2 . 设备控制指令时序表 CREATE TABLE IF NOT EXISTS device_commands ( ts TIMESTAMP NOT NULL, command_type VARCHAR(64 ), parameters varchar (64 ), status VARCHAR (32 ), executed_at TIMESTAMP, error_message VARCHAR (256 ) ) ATTRIBUTES ( device_id VARCHAR(64 ) NOT NULL, command_id VARCHAR (128 ), user_id VARCHAR (64 ), source VARCHAR (32 ) ) PRIMARY TAGS (device_id) RETENTIONS 30 D ACTIVETIME 1 h; -- 3 . 规则执行历史时序表 CREATE TABLE IF NOT EXISTS rule_execution ( ts TIMESTAMP NOT NULL, trigger_event VARCHAR(32 ), conditions_matched VARCHAR (32 ), actions_executed VARCHAR (32 ), status VARCHAR (32 ), execution_time_ms INT, error_message VARCHAR (256 ) ) ATTRIBUTES ( rule_id INT NOT NULL, rule_name VARCHAR(128 ) NOT NULL, priority INT, triggered_by VARCHAR (64 ) ) PRIMARY TAGS (rule_id) RETENTIONS 60 D ACTIVETIME 6 h; -- 4 . 系统事件时序表 CREATE TABLE IF NOT EXISTS system_events ( ts TIMESTAMP NOT NULL, event_type VARCHAR(64 ), event_level VARCHAR (32 ), message VARCHAR (512 ), details VARCHAR (64 ) ) ATTRIBUTES ( source VARCHAR(64 ) NOT NULL, device_id VARCHAR (64 ) ) PRIMARY TAGS (source) RETENTIONS 30 D ACTIVETIME 1 h; -- 5 . 用户行为时序表 CREATE TABLE IF NOT EXISTS user_activity ( ts TIMESTAMP NOT NULL, action_type VARCHAR(64 ), device_id VARCHAR (64 ), action_details VARCHAR (64 ) ) ATTRIBUTES ( user_id VARCHAR(64 ) NOT NULL, session_id VARCHAR (128 ), ip_address VARCHAR (64 ), user_agent VARCHAR (256 ) ) PRIMARY TAGS (user_id) RETENTIONS 90 D ACTIVETIME 24 h;
USE defaultdb; -- 1. 设备管理表(关系型表) CREATE TABLE IF NOT EXISTS devices ( id INT PRIMARY KEY, name STRING NOT NULL , device_id STRING NOT NULL UNIQUE, -- 设备唯一标识 device_type STRING NOT NULL , protocol STRING NOT NULL , location STRING, status STRING DEFAULT 'online' , config JSONB, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ; CREATE INDEX IF NOT EXISTS idx_devices_device_id ON devices (device_id) ; CREATE INDEX IF NOT EXISTS idx_devices_type ON devices (device_type) ; CREATE INDEX IF NOT EXISTS idx_devices_location ON devices (location) ; CREATE INDEX IF NOT EXISTS idx_devices_status ON devices (status) ; -- 2. 自动化规则表(关系型表) CREATE TABLE IF NOT EXISTS automation_rules ( id INT PRIMARY KEY, name STRING NOT NULL , description STRING, enabled BOOL DEFAULT TRUE, priority INT DEFAULT 0 , conditions JSONB NOT NULL , actions JSONB NOT NULL , created_by STRING DEFAULT 'system' , created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, last_triggered_at TIMESTAMP ) ; CREATE INDEX IF NOT EXISTS idx_rules_enabled ON automation_rules (enabled) ; CREATE INDEX IF NOT EXISTS idx_rules_priority ON automation_rules (priority DESC) ; -- 3. 数据聚合统计表(关系型表) CREATE TABLE IF NOT EXISTS data_statistics ( id INT PRIMARY KEY, device_id STRING NOT NULL , data_type STRING NOT NULL , stat_type STRING NOT NULL , time_window STRING NOT NULL , stat_value DECIMAL(20 ,6 ), stat_time TIMESTAMP NOT NULL , calculated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ; CREATE INDEX IF NOT EXISTS idx_data_stats_device ON data_statistics (device_id, data_type, stat_type, time_window, stat_time) ; -- 4. 设备状态快照表(关系型表) CREATE TABLE IF NOT EXISTS device_snapshots ( id INT PRIMARY KEY, device_id STRING NOT NULL , snapshot_time TIMESTAMP NOT NULL , current_state JSONB, battery_level INT, signal_strength INT, last_seen TIMESTAMP ) ; CREATE INDEX IF NOT EXISTS idx_device_snapshots_device ON device_snapshots (device_id, snapshot_time DESC) ; -- ============================================ -- 创建序列 -- ============================================ CREATE SEQUENCE IF NOT EXISTS devices_seq START 1 ; CREATE SEQUENCE IF NOT EXISTS automation_rules_seq START 1 ; CREATE SEQUENCE IF NOT EXISTS data_statistics_seq START 1 ; CREATE SEQUENCE IF NOT EXISTS device_snapshots_seq START 1 ;
-- 创建视图 -- ============================================ CREATE VIEW IF NOT EXISTS v_device_latest_status AS SELECT d.id, d.name, d.device_id, d.device_type, d.location, d.status, hs.ts as last_update, hs.value, hs.data_type, hs.status as sensor_status FROM devices d LEFT JOIN ( SELECT device_id, data_type, ts, value, status, ROW_NUMBER() OVER (PARTITION BY device_id ORDER BY ts DESC) as rn FROM tsdb.home_sensor ) hs ON d.device_id = hs.device_id AND hs.rn = 1 ; CREATE VIEW IF NOT EXISTS v_active_rules AS SELECT id, name, description, priority, conditions, actions, last_triggered_at FROM automation_rules WHERE enabled = TRUE ORDER BY priority DESC;
POST /api/data/receive - 接收设备数据 POST /api/device/<id >/control - 控制设备 GET /api/devices - 获取设备列表 GET /api/sensor-data/latest - 获取最新传感器数据 GET /api/rules - 获取自动化规则 GET /api/statistics - 获取统计数据
curl -X POST http://localhost:5001 /api/data /receive \ -H "Content-Type: application/json" \ -d '{"device_id" : "sensor_temp_livingroom" , "value" : 26.5 , "data_type" : "temperature" , "unit" : "°C" }'
curl -X POST http://localhost:5001/api/device/5/control \ -H "Content-Type: application/json" \ -d '{"command": "turn_on", "parameters": {"brightness": 80}}'
INSERT INTO devices (id, name, device_id, device_type, protocol, location, status, config) VALUES (1 , '客厅温度传感器' , 'sensor_temp_livingroo m', ' temperature', ' mqtt', ' 客厅', ' online', ' {"topic" : "sensors/livingroom/temp" , "interval" : 60 }'), (2, ' 客厅湿度传感器', ' sensor_humidity_livingroom', ' humidity', ' mqtt', ' 客厅', ' online', ' {"topic" : "sensors/livingroom/humidity" , "interval" : 60 }'), (3, ' 客厅人体传感器', ' sensor_motion_livingroom', ' motion', ' mqtt', ' 客厅', ' online', ' {"topic" : "sensors/livingroom/motion" , "interval" : 5 }'), (4, ' 客厅光照传感器', ' sensor_light_livingroom', ' light', ' mqtt', ' 客厅', ' online', ' {"topic" : "sensors/livingroom/light" , "interval" : 30 }'), (5, ' 客厅智能灯', ' device_light_livingroom', ' smart_light', ' mqtt', ' 客厅', ' online', ' {"topic" : "devices/livingroom/light" , "supported_commands" : ["turn_on" , "turn_off" , "set_brightness" , "set_color" ]}'), (6, ' 卧室温度传感器', ' sensor_temp_bedroom', ' temperature', ' mqtt', ' 卧室', ' online', ' {"topic" : "sensors/bedroom/temp" , "interval" : 60 }'), (7, ' 卧室智能灯', ' device_light_bedroom', ' smart_light', ' mqtt', ' 卧室', ' online', ' {"topic" : "devices/bedroom/light" , "supported_commands" : ["turn_on" , "turn_off" , "set_brightness" ]}'), (8, ' 智能门锁', ' device_lock_entrance', ' smart_lock', ' mqtt', ' 玄关', ' online', ' {"topic" : "devices/entrance/lock" , "supported_commands" : ["lock" , "unlock" , "get_status" ]}'), (9, ' 智能窗帘', ' device_curtain_livingroom', ' smart_curtain', ' mqtt', ' 客厅', ' online', ' {"topic" : "devices/livingroom/curtain" , "supported_commands" : ["open" , "close" , "set_position" ]}'), (10, ' 智能空调', ' device_ac_livingroom', ' smart_ac', ' mqtt', ' 客厅', ' online', ' {"topic" : "devices/livingroom/ac" , "supported_commands" : ["turn_on" , "turn_off" , "set_temperature" , "set_mode" ]}}'); -- ============================================ -- 插入示例自动化规则 -- ============================================ INSERT INTO automation_rules (id, name, description, enabled, priority, conditions, actions) VALUES (1, ' 客厅有人自动开灯', ' 检测到客厅有人且光线低于阈值时自动开灯', TRUE, 10, ' {"conditions" : [{"device_id" : "sensor_motion_livingroom" , "type" : "motion" , "operator" : "=" , "value" : true }, {"device_id" : "sensor_light_livingroom" , "type" : "light" , "operator" : "<" , "value" : 100 }], "logic" : "AND" }', ' {"actions" : [{"device_id" : "device_light_livingroom" , "command" : "turn_on" , "parameters" : {"brightness" : 80 }}]}'), (2, ' 客厅无人自动关灯', ' 客厅无人超过10 分钟自动关灯', TRUE, 5, ' {"conditions" : [{"device_id" : "sensor_motion_livingroom" , "type" : "motion" , "operator" : "=" , "value" : false , "duration" : 600 }], "logic" : "AND" }', ' {"actions" : [{"device_id" : "device_light_livingroom" , "command" : "turn_off" }]}'), (3, ' 温度过高自动开空调', ' 客厅温度超过28 度自动开启空调', TRUE, 8, ' {"conditions" : [{"device_id" : "sensor_temp_livingroom" , "type" : "temperature" , "operator" : ">" , "value" : 28 }], "logic" : "AND" }', ' {"actions" : [{"device_id" : "device_ac_livingroom" , "command" : "turn_on" , "parameters" : {"temperature" : 24 , "mode" : "cool" }}]}'), (4, ' 离家模式', ' 离家时关闭所有灯光和空调', TRUE, 15, ' {"conditions" : [{"device_id" : "device_lock_entrance" , "type" : "door_lock" , "operator" : "=" , "value" : "locked" }], "logic" : "AND" }', ' {"actions" : [{"device_id" : "device_light_livingroom" , "command" : "turn_off" }, {"device_id" : "device_light_bedroom" , "command" : "turn_off" }, {"device_id" : "device_ac_livingroom" , "command" : "turn_off" }]}');
#!/usr/bin/env python3 """ 智能家居Web应用主程序 提供数据展示和控制界面 """ import sys import os import json import logging from datetime import datetime, timedelta from flask import Flask, render_template, jsonify, request # 添加模块路径 sys.path .insert (0 , os.path.join(os.path.dirname(__file__), '../data-processor')) sys.path .insert (0 , os.path.join(os.path.dirname(__file__), '../rule-engine')) sys.path .insert (0 , os.path.join(os.path.dirname(__file__), '../storage')) logging.basicConfig (level=logging.INFO) logger = logging.getLogger (__name__) app = Flask (__name__, template_folder='templates', static_folder='static') # 模拟数据库连接 class MockDB: "" "模拟数据库连接" "" def execute_query (self, query, params=None): "" "执行查询" "" import subprocess try: cmd = ['docker' , 'exec' , 'kwdb-server' , '/kaiwudb/bin/kwbase' , 'sql' , '--insecure' , '--execute' , query] result = subprocess.run (cmd, capture_output=True, text=True) if result.returncode == 0 : lines = result.stdout.strip ().split ('\n' ) if len (lines) > 1 : # 解析结果 headers = lines[0 ].split ('\t' ) data = [] for line in lines[1 :]: values = line.split ('\t' ) if len (values) == len (headers): data.append (values) return data return [] except Exception as e: logger.error (f"查询失败: {e}" ) return [] db = MockDB () @app.route ('/' ) def index (): "" "首页" "" return render_template ('index.html' ) @app.route ('/api/devices' ) def get_devices (): "" "获取设备列表" "" " SELECT id, name, device_id, device_type, protocol, location, status FROM devices ORDER BY id " "" results = db.execute_query (query) devices = [] if results and len (results) > 1 : headers = results[0 ] for row in results[1 :]: device = { 'id': row[0 ], 'name' : row[1 ], 'device_id' : row[2 ], 'device_type' : row[3 ], 'protocol' : row[4 ], 'location' : row[5 ], 'status' : row[6 ] } devices.append (device) return jsonify (devices) @app .route('/api/devices/<int:device_id>/data' ) def get_device_data(device_id): "" "获取设备数据" "" hours = request.args.get('hours' , 24 , type=int) start_time = (datetime.now() - timedelta(hours=hours)).isoformat() # 获取设备信息 device_query = f"SELECT device_id FROM devices WHERE id = {device_id}" device_result = db.execute_query(device_query) if not device_result or len(device_result) < 2 : return jsonify({'error': '设备不存在' }), 404 device_id_str = device_result[1] [0] # 获取传感器数据 data_query = f""" SELECT ts, value, data_type, status FROM sensor_data WHERE device_id = '{device_id_str}' AND ts >= '{start_time}' ORDER BY ts DESC LIMIT 100 """ results = db.execute_query (data_query) data_points = [] if results and len (results) > 1 : for row in results[1 :]: data_points.append ({ 'timestamp': row[0 ], 'value' : float (row[1 ]) if row[1 ] else None, 'data_type' : row[2 ], 'status' : row[3 ] }) return jsonify ({ 'device_id': device_id, 'device_id_str': device_id_str, 'data': data_points }) @app .route('/api/sensor-data/latest' ) def get_latest_sensor_data(): "" "获取最新传感器数据" "" " SELECT d.id, d.name, d.device_type, d.location, d.status, sd.ts as last_update, sd.value, sd.data_type, sd.unit FROM devices d LEFT JOIN ( SELECT device_id, data_type, ts, value, unit, ROW_NUMBER() OVER (PARTITION BY device_id ORDER BY ts DESC) as rn FROM sensor_data ) sd ON d.device_id = sd.device_id AND sd.rn = 1 ORDER BY d.id " "" results = db.execute_query(query) sensor_data = [] if results and len(results) > 1 : for row in results[1 :]: sensor_data.append({ 'id': row[0 ], 'name' : row[1 ], 'device_type' : row[2 ], 'location' : row[3 ], 'status' : row[4 ], 'last_update' : row[5 ], 'value' : float (row[6 ]) if row[6 ] else None, 'data_type' : row[7 ], 'unit' : row[8 ] if len (row) > 8 else None }) return jsonify (sensor_data) @app .route('/api/rules' ) def get_rules(): "" "获取自动化规则" "" " SELECT id, name, description, enabled, priority, last_triggered_at FROM automation_rules ORDER BY priority DESC " "" results = db.execute_query(query) rules = [] if results and len(results) > 1 : for row in results[1 :]: rules.append({ 'id': row[0 ], 'name' : row[1 ], 'description' : row[2 ], 'enabled' : row[3 ] == 't' , 'priority' : int (row[4 ]), 'last_triggered_at' : row[5 ] }) return jsonify (rules) @app .route('/api/statistics' ) def get_statistics(): "" "获取统计数据" "" stats = {} # 设备统计" SELECT device_type, COUNT (*) as count, COUNT (CASE WHEN status = 'online' THEN 1 END) as online_count FROM devices GROUP BY device_type """ device_results = db.execute_query (device_query) stats['devices' ] = {} if device_results and len (device_results) > 1 : for row in device_results[1 :]: stats['devices' ][row[0 ]] = { 'total': int (row[1 ]), 'online' : int (row[2 ]) } # 传感器数据统计" SELECT COUNT (*) as total_records FROM sensor_data """ data_results = db.execute_query (data_query) if data_results and len (data_results) > 1 : stats['sensor_data' ] = { 'total_records': int (data_results[1 ][0 ]) } # 规则统计" SELECT COUNT (*) as total, COUNT (CASE WHEN enabled = TRUE THEN 1 END) as enabled FROM automation_rules """ rule_results = db.execute_query (rule_query) if rule_results and len (rule_results) > 1 : stats['rules' ] = { 'total': int (rule_results[1 ][0 ]), 'enabled' : int (rule_results[1 ][1 ]) } return jsonify (stats) @app .route('/api/data/receive' , methods=['POST' ]) def receive_data(): "" "接收设备数据" "" try : data = request.json # 验证数据 if not data or 'device_id' not in data or 'value' not in data : return jsonify({'error': '缺少必要字段' }), 400 # 插入数据 device_id = data['device_id' ] value = data['value' ] data_type = data.get ('data_type', 'value') status = data.get ('status', 'good') unit = data.get ('unit', '') query = f""" INSERT INTO sensor_data (time, device_id, data_type, value, status, unit) VALUES (NOW(), '{device_id}', '{data_type}', {value}, '{status}', '{unit}') """ db.execute_query (query) return jsonify ({'success': True, 'message': '数据接收成功'}) except Exception as e: logger.error (f"接收数据失败: {e}" ) return jsonify ({'error': str (e)}), 500 @app .route('/api/device/<int:device_id>/control' , methods=['POST' ]) def control_device(device_id): "" "控制设备" "" try : data = request.json if not data or 'command' not in data : return jsonify({'error': '缺少命令参数' }), 400 command = data['command' ] parameters = data.get ('parameters', {}) # 获取设备信息 device_query = f"SELECT device_id, name FROM devices WHERE id = {device_id}" device_result = db.execute_query (device_query) if not device_result or len (device_result) < 2 : return jsonify ({'error': '设备不存在' }), 404 device_id_str = device_result[1] [0] device_name = device_result[1] [1] # 记录控制指令 cmd_query = f""" INSERT INTO device_commands (time, device_id, command_type, parameters, status) VALUES (NOW(), '{device_id_str}', '{command}', '{json.dumps (parameters)}', 'success') """ db.execute_query (cmd_query) logger.info (f"控制设备: {device_name} ({device_id_str}), 命令: {command}") return jsonify ({ 'success': True, 'message': f'设备 {device_name} 控制指令已发送', 'command': command, 'parameters': parameters }) except Exception as e: logger.error (f"控制设备失败: {e}" ) return jsonify ({'error': str (e)}), 500 if __name__ == '__main__': app.run (host='0.0.0.0' , port=5001 , debug=True)
四、系统运行与功能测试 启动系统 Web API 服务,并通过接口测试 与页面验证 ,确认系统所有功能正常运行
1、启动系统 Web API 服务 cd /root/workspace/smarthome-system/web-api && python3 simple_app.py > /tmp/smarthome-web-5002.log 2>&1 &
root@kwdb :~ / workspace/ smarthome- system / web- api# python3 simple_app.py 启动智能家居Web服务... * Serving Flask app 'simple_app' * Debug mode: off WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead. * Running on all addresses (0.0 .0 .0 ) * Running on http:/ / 127.0 .0 .1 :5002 * Running on http:/ / 192.168 .150 .111 :5002 Press CTRL+ C to quit 127.0 .0 .1 - - [27 / Feb/ 2026 13 :06 :51 ] "GET / HTTP/1.1" 200 - 127.0 .0 .1 - - [27 / Feb/ 2026 13 :06 :51 ] "GET /static/css/style.css HTTP/1.1" 200 - 127.0 .0 .1 - - [27 / Feb/ 2026 13 :06 :51 ] "GET /static/js/app.js HTTP/1.1" 200 - 执行查询: SELECT id, name, device_type, protocol, location, status FROM devices ORDER BY id 127.0 .0 .1 - - [27 / Feb/ 2026 13 :06 :51 ] "GET /favicon.ico HTTP/1.1" 404 - 查询结果: [['1' , '客厅温度传感器' , 'temperature' , 'mqtt' , '客厅' , 'online' ], ['2' , '客厅湿度传感器' , 'humidity' , 'mqtt' , '客厅' , 'online' ], ['3' , '客厅人体传感器' , 'motion' , 'mqtt' , '客厅' , 'online' ], ['4' , '客厅光照传感器' , 'light' , 'mqtt' , '客厅' , 'online' ], ['5' , '客厅智能灯' , 'smart_light' , 'mqtt' , '客厅' , 'online' ], ['6' , '卧室温度传感器' , 'temperature' , 'mqtt' , '卧室' , 'online' ], ['7' , '卧室智能灯' , 'smart_light' , 'mqtt' , '卧室' , 'online' ], ['8' , '智能门锁' , 'smart_lock' , 'mqtt' , '玄关' , 'online' ], ['9' , '智能窗帘' , 'smart_curtain' , 'mqtt' , '客厅' , 'online' ], ['10' , '智能空调' , 'smart_ac' , 'mqtt' , '客厅' , 'online' ]]
启动成功后,终端将显示:Running on http://[你的服务器IP]:5002,即服务启动完成,点击在浏览器中打开可以看到程序运行的界面
系统概览:设备总数、在线设备数、自动化规则数、数据记录数及设备列表
实时数据:温湿度、光照、人体感应等传感器的最新数据与更新时间;自动化规则:所有规则的名称、描述、启用状态、优先级;
系统状态:KaiwuDB 版本、服务运行状态、接口文档,这里为了方便把数据接口放在了最下面,便于查找调用
2、测试接口 curl -X POST http://localhost:5002 /api/data /receive -H "Content-Type: application/json" -d '{"device_id" : "temperature" , "value" : 32 , "data_type" : "temperature" , "unit" : "°C" }'
将传感器实时数据上报至本地系统,数据自动存储至 KaiwuDB 时序表,这里可以看到客厅问题传感器已经标称32度
curl -s http://localhost:5002/api/devices
至此项目开发全部结束,希望对想研究ai及kwdb数据库的同学有一些帮助
五、总结 本文以 KaiwuDB 为数据基座,华为 CodeArts 代码智能体为自动化开发引擎,实现了智能家居本地化数据处理系统的快速构建,全程无需复杂的手动编码,新手也可按步骤复现。随着 AIoT 技术的发展,本地化数据处理将成为智能家居的主流趋势,KaiwuDB 多模数据库让开发者实现 '轻量、高效、安全' 的智能场景落地。
相关免费在线工具 curl 转代码 解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
Base64 字符串编码/解码 将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
Base64 文件转换器 将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
Markdown转HTML 将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
HTML转Markdown 将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online
JSON 压缩 通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online