物联网数据采集与可视化:使用 Python 和 MQTT 构建实时监控系统

物联网数据采集与可视化:使用 Python 和 MQTT 构建实时监控系统

引言

物联网(IoT)的核心在于连接各种设备并收集其产生的数据。然而,仅仅收集数据是不够的,我们需要能够实时地处理、分析和可视化这些数据,以便做出及时的决策。本文将带你构建一个完整的物联网数据采集与可视化系统,使用 Python、MQTT 协议和 WebSocket 技术,实现实时数据监控。

我们将创建一个模拟的温度传感器系统,该系统会:

  1. 模拟传感器数据生成
  2. 使用 MQTT 协议发布数据
  3. 通过 Python 服务订阅并处理数据
  4. 将处理后的数据通过 WebSocket 发送到前端
  5. 使用 HTML/CSS/JavaScript 实现前端可视化

系统架构

[传感器] --> [MQTT Broker] --> [Python 服务] --> [WebSocket Server] --> [Web 浏览器] 

所需软件和库

  • Python 3.x
  • paho-mqtt: MQTT 客户端库
  • websockets: WebSocket 服务端库
  • asyncio: 异步 I/O 库
  • flask: Web 框架(用于提供静态文件)
  • matplotlib: 数据可视化库
  • numpy: 数值计算库

项目结构

iot_monitoring/ ├── app.py # 主应用逻辑 ├── mqtt_publisher.py # MQTT 发布者(模拟传感器) ├── requirements.txt # 依赖包列表 ├── templates/ │ └── index.html # 前端页面 └── static/ └── script.js # 前端 JavaScript 

安装依赖

pip install paho-mqtt websockets flask matplotlib numpy asyncio 

1. MQTT 发布者(模拟传感器)

创建 mqtt_publisher.py 文件:

# mqtt_publisher.pyimport random import time import json import paho.mqtt.client as mqtt # MQTT 配置 MQTT_BROKER ="localhost" MQTT_PORT =1883 TOPIC ="iot/sensors/temperature"# 模拟传感器 ID SENSOR_ID ="temp_001"defon_connect(client, userdata, flags, rc):if rc ==0:print("Connected to MQTT Broker!")else:print("Failed to connect, return code %d\n", rc)defsimulate_sensor_data(): client = mqtt.Client() client.on_connect = on_connect # 连接到 MQTT 代理 client.connect(MQTT_BROKER, MQTT_PORT,60)# 开始循环发送数据try:whileTrue:# 模拟温度数据 (范围: 18°C - 30°C) temperature =round(random.uniform(18.0,30.0),2)# 模拟湿度数据 (范围: 40% - 80%) humidity =round(random.uniform(40.0,80.0),2) payload ={"sensor_id": SENSOR_ID,"temperature": temperature,"humidity": humidity,"timestamp": time.time()}# 发布消息 client.publish(TOPIC, json.dumps(payload))print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Published: {payload}")# 每 5 秒发送一次数据 time.sleep(5)except KeyboardInterrupt:print("\nStopping publisher...") client.disconnect()if __name__ =="__main__": simulate_sensor_data()

2. 主应用逻辑

创建 app.py 文件:

# app.pyimport asyncio import websockets import json import threading import time import paho.mqtt.client as mqtt from flask import Flask, render_template import logging # 设置日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__)# 全局变量用于存储最新数据和 WebSocket 连接 latest_data ={} connected_clients =set()# MQTT 配置 MQTT_BROKER ="localhost" MQTT_PORT =1883 MQTT_TOPIC ="iot/sensors/temperature"# Flask 应用 app = Flask(__name__)@app.route('/')defindex():return render_template('index.html')# WebSocket 服务器asyncdefhandle_websocket(websocket, path):""" 处理 WebSocket 连接 """ logger.info(f"WebSocket client connected from {websocket.remote_address}")# 将新客户端加入集合 connected_clients.add(websocket)try:asyncfor message in websocket:# 可以在这里处理来自客户端的消息passexcept websockets.exceptions.ConnectionClosed: logger.info(f"WebSocket client disconnected from {websocket.remote_address}")finally:# 移除断开的客户端 connected_clients.discard(websocket)asyncdefbroadcast_data(data):""" 向所有连接的 WebSocket 客户端广播数据 """if connected_clients:# 如果有客户端连接# 准备要发送的数据 data_to_send = json.dumps(data)# 并发地向所有客户端发送await asyncio.gather(*[client.send(data_to_send)for client in connected_clients], return_exceptions=True)# MQTT 回调函数defon_message(client, userdata, msg):""" 处理从 MQTT 接收到的消息 """try: data = json.loads(msg.payload.decode())global latest_data latest_data = data logger.info(f"Received MQTT message: {data}")# 将数据广播给所有 WebSocket 客户端 asyncio.run(broadcast_data(data))except Exception as e: logger.error(f"Error processing MQTT message: {e}")defstart_mqtt_client():""" 启动 MQTT 客户端 """ client = mqtt.Client() client.on_message = on_message client.connect(MQTT_BROKER, MQTT_PORT,60) client.subscribe(MQTT_TOPIC) logger.info("MQTT Client started and subscribed to topic.") client.loop_forever()# 持续监听消息asyncdefmain():""" 主异步函数 """# 启动 Flask 应用 (在单独线程中)defrun_flask(): app.run(host='0.0.0.0', port=5000, debug=False) flask_thread = threading.Thread(target=run_flask) flask_thread.daemon =True flask_thread.start()# 启动 MQTT 客户端 (在单独线程中) mqtt_thread = threading.Thread(target=start_mqtt_client) mqtt_thread.daemon =True mqtt_thread.start()# 启动 WebSocket 服务器 server =await websockets.serve(handle_websocket,"0.0.0.0",8765) logger.info("WebSocket server started on ws://0.0.0.0:8765")await server.wait_closed()if __name__ =="__main__":try: asyncio.run(main())except KeyboardInterrupt: logger.info("Shutting down...")

3. 前端页面

创建 templates/index.html 文件:

<!-- templates/index.html --><!DOCTYPEhtml><htmllang="en"><head><metacharset="UTF-8"><title>IoT Temperature Monitor</title><scriptsrc="https://cdn.jsdelivr.net/npm/chart.js"></script><style>body{font-family: Arial, sans-serif;margin: 20px;background-color: #f5f5f5;}.container{max-width: 1200px;margin: auto;background-color: white;padding: 20px;border-radius: 8px;box-shadow: 0 2px 10px rgba(0,0,0,0.1);}h1{color: #333;text-align: center;}.data-display{display: flex;justify-content: space-around;margin-bottom: 20px;flex-wrap: wrap;}.sensor-card{background-color: #e9ecef;padding: 15px;border-radius: 5px;margin: 10px;min-width: 200px;text-align: center;}.sensor-value{font-size: 2em;font-weight: bold;}.chart-container{width: 100%;height: 400px;margin-top: 20px;}.status{text-align: center;margin-top: 10px;font-weight: bold;}.status.connected{color: green;}.status.disconnected{color: red;}</style></head><body><divclass="container"><h1>IoT Temperature & Humidity Monitor</h1><divid="status"class="status disconnected">Connecting...</div><divclass="data-display"><divclass="sensor-card"><h3>Temperature</h3><divid="temperature"class="sensor-value">-- °C</div></div><divclass="sensor-card"><h3>Humidity</h3><divid="humidity"class="sensor-value">-- %</div></div><divclass="sensor-card"><h3>Last Update</h3><divid="timestamp"class="sensor-value">--</div></div></div><divclass="chart-container"><canvasid="sensorChart"></canvas></div></div><scriptsrc="/static/script.js"></script></body></html>

4. 前端 JavaScript

创建 static/script.js 文件:

// static/script.jsconst wsUri ="ws://localhost:8765";let websocket;let chart; document.addEventListener('DOMContentLoaded',function(){const ctx = document.getElementById('sensorChart').getContext('2d'); chart =newChart(ctx,{type:'line',data:{labels:[],// 时间标签datasets:[{label:'Temperature (°C)',data:[],borderColor:'rgb(255, 99, 132)',backgroundColor:'rgba(255, 99, 132, 0.2)',tension:0.1,fill:false},{label:'Humidity (%)',data:[],borderColor:'rgb(54, 162, 235)',backgroundColor:'rgba(54, 162, 235, 0.2)',tension:0.1,fill:false}]},options:{responsive:true,maintainAspectRatio:false,scales:{x:{title:{display:true,text:'Time'}},y:{beginAtZero:false}},plugins:{legend:{position:'top',},title:{display:true,text:'Real-time Sensor Data'}}}});connectWebSocket();});functionconnectWebSocket(){ console.log("Attempting to connect to WebSocket server..."); websocket =newWebSocket(wsUri); websocket.onopen=function(event){ console.log("WebSocket connection opened."); document.getElementById('status').textContent ="Connected"; document.getElementById('status').className ="status connected";}; websocket.onclose=function(event){ console.log("WebSocket connection closed."); document.getElementById('status').textContent ="Disconnected"; document.getElementById('status').className ="status disconnected";// 尝试重新连接setTimeout(connectWebSocket,5000);}; websocket.onerror=function(error){ console.error("WebSocket error:", error); document.getElementById('status').textContent ="Connection Error"; document.getElementById('status').className ="status disconnected";}; websocket.onmessage=function(event){try{const data =JSON.parse(event.data); console.log("Received data:", data);// 更新 DOM 元素 document.getElementById('temperature').textContent = data.temperature.toFixed(2)+' °C'; document.getElementById('humidity').textContent = data.humidity.toFixed(2)+' %';// 显示人类可读的时间const date =newDate(data.timestamp *1000); document.getElementById('timestamp').textContent = date.toLocaleString();// 更新图表updateChart(data);}catch(e){ console.error("Error parsing JSON:", e);}};}functionupdateChart(data){const timestamp =newDate(data.timestamp *1000).toLocaleTimeString();const temp = data.temperature;const humidity = data.humidity;// 添加新数据点 chart.data.labels.push(timestamp); chart.data.datasets[0].data.push(temp);// 温度 chart.data.datasets[1].data.push(humidity);// 湿度// 限制图表上的数据点数量(例如,保留最近的 20 个)if(chart.data.labels.length >20){ chart.data.labels.shift(); chart.data.datasets[0].data.shift(); chart.data.datasets[1].data.shift();}// 更新图表 chart.update();}

5. 运行项目

步骤 1: 启动 MQTT Broker

你需要先有一个 MQTT 代理运行。最简单的方法是使用 Docker 运行 Mosquitto:

docker run -d \ --name mosquitto \ -p 1883:1883 \ -p 9001:9001 \ --restart unless-stopped \ eclipse-mosquitto 

或者,如果你在本地安装了 Mosquitto,请确保服务已启动。

步骤 2: 启动主应用

在终端中运行:

python3 app.py 

这将启动:

  1. Flask Web 服务器 (监听端口 5000)
  2. WebSocket 服务器 (监听端口 8765)
  3. MQTT 客户端 (订阅主题 iot/sensors/temperature)

步骤 3: 启动 MQTT 发布者

在另一个终端中运行:

python3 mqtt_publisher.py 

这个脚本会每隔 5 秒向 MQTT 主题 iot/sensors/temperature 发布一条模拟传感器数据。

步骤 4: 访问前端页面

打开浏览器访问 http://localhost:5000

你应该能看到:

  1. 实时更新的温度和湿度数值
  2. 最近一次更新的时间
  3. 一个实时折线图,显示温度和湿度随时间的变化

总结

这个项目展示了如何构建一个基本的物联网数据采集与可视化系统。我们使用了 MQTT 进行设备间通信,通过 Python 服务处理数据,并使用 WebSocket 实现实时推送,最后通过前端页面进行可视化展示。

这个系统具有良好的扩展性:

  • 可以轻松添加更多的传感器类型
  • 可以集成更复杂的后端处理逻辑
  • 可以使用不同的可视化库或框架
  • 可以将数据存储到数据库中进行历史查询

通过这种方式,你可以快速搭建一个可用于监控和分析 IoT 数据的原型系统。

Read more

人工智能、机器学习和深度学习,其实不是一回事

人工智能、机器学习和深度学习,其实不是一回事

一、人工智能、机器学习与深度学习的真正区别 在当今科技领域,我们经常听到人工智能、机器学习和深度学习这三个词。它们虽然相关,但含义不同。 1.1 人工智能 人工智能是计算机科学的一个分支,旨在研究如何合成与分析能够像人一样行动的计算主体。简单来说,AI 的目标是利用计算机来模拟甚至替代人类大脑的功能。 一个理想的 AI 系统通常具备以下特征:像人一样思考、像人一样行动、理性地思考与行动。 1.2 机器学习 机器学习是实现人工智能的一种途径。它的核心定义是:赋予计算机在没有被显式编程的情况下进行学习的能力。 与传统的基于规则的编程不同,机器学习不依赖程序员手写每一条逻辑指令,而是通过算法让机器从大量数据中寻找规律,从而对新的数据产生预测或判断。 1.3 深度学习 深度学习是机器学习的一种特殊方法,也称为深度神经网络。它受人类大脑结构的启发,通过设计多层的神经元网络结构,来模拟万事万物的特征表示。 1.4 三者之间的层级关系 厘清这三者的关系对于初学者至关重要。人工智能 AI是最宏大的概念,包含了所有让机器变聪明的技术。机器学习 ML是 AI

By Ne0inhk

91代码神器:AI如何帮你自动生成高质量代码

快速体验 1. 打开 InsCode(快马)平台 https://www.inscode.net 2. 点击'项目生成'按钮,等待项目生成完整后预览效果 输入框内输入如下内容: 使用91代码平台,创建一个能够自动生成Python爬虫代码的AI助手。要求:1. 输入目标网站URL后自动分析页面结构;2. 根据分析结果生成可运行的爬虫代码;3. 支持反爬虫机制处理;4. 输出结构化数据存储方案。使用Kimi-K2模型实现智能代码生成,并确保代码包含必要的注释和异常处理。 最近尝试用AI工具自动生成Python爬虫代码,发现整个过程比想象中高效。通过91代码平台的Kimi-K2模型,只需要简单几步就能完成从页面分析到完整爬虫的生成。这里分享下具体实现过程和实际体验。 1. 输入目标网站URL 在平台对话框直接粘贴需要爬取的网址,比如某个电商商品页面。AI会先自动检测网页结构特征,识别出关键数据区域(如商品标题、价格、评论区块等),这步相当于人工查看网页源码的自动化版本。 2. 智能生成基础爬虫框架 根据分析结果,AI会生成包含Requests或Sel

By Ne0inhk
Claude Code安装与使用完全指南:2026 年最前沿的 AI 编程助手

Claude Code安装与使用完全指南:2026 年最前沿的 AI 编程助手

文章目录 * 前言 * 一、什么是 Claude Code? * 1.1 定义与定位 * 1.2 技术优势 * 二、安装前的环境准备 * 2.1 系统要求 * 2.2 前置依赖 * 三、Claude Code 全平台安装教程 * 3.1 安装方式对比 * 3.2 Windows 系统安装 * 3.3 macOS 系统安装 * 3.5 安装后初始化 * 四、配置与优化 * 4.1 配置文件位置 * 4.2 跳过新手引导 * 4.3 接入国产大模型(免翻墙方案)

By Ne0inhk
Flutter 三方库 mediapipe_core 的鸿蒙化适配指南 - 实现高性能的端侧 AI 推理库集成、支持多维视觉任务与手势/表情识别实战

Flutter 三方库 mediapipe_core 的鸿蒙化适配指南 - 实现高性能的端侧 AI 推理库集成、支持多维视觉任务与手势/表情识别实战

欢迎加入开源鸿蒙跨平台社区:https://openharmonycrossplatform.ZEEKLOG.net Flutter 三方库 mediapipe_core 的鸿蒙化适配指南 - 实现高性能的端侧 AI 推理库集成、支持多维视觉任务与手势/表情识别实战 前言 在进行 Flutter for OpenHarmony 的智能化应用开发时,集成强大的机器学习(ML)能力是打造差异化体验的关键。mediapipe_core 是谷歌 MediaPipe 框架在 Dart 侧的核心封装库。它能让你在鸿蒙真机上实现极其流畅的人脸检测、手势追踪以及实时姿态估计。本文将深入探讨如何在鸿蒙系统下构建低功耗、高响应的端侧 AI 推理链路。 一、原原理性解析 / 概念介绍 1.1 基础原理 mediapipe_core 作为 MediaPipe 的“神经中枢”

By Ne0inhk