跳到主要内容物联网数据采集与可视化:基于 Python 和 MQTT 的实时监控系统 | 极客日志Python大前端
物联网数据采集与可视化:基于 Python 和 MQTT 的实时监控系统
本方案通过 Python、MQTT 和 WebSocket 技术构建了一套物联网数据采集与可视化系统。系统包含模拟传感器发布数据、后端服务订阅处理及前端实时展示三个核心环节。重点解决了多线程环境下 MQTT 回调与异步 WebSocket 服务的并发问题,利用队列机制确保数据流转稳定。最终实现了温度和湿度数据的实时刷新与动态图表展示,为 IoT 监控提供了可扩展的原型参考。
云间漫步1 浏览 物联网数据采集与可视化:基于 Python 和 MQTT 的实时监控系统
引言
在物联网项目中,数据只是原材料。如果无法实时处理、分析和展示这些数据,我们就很难做出及时的决策。今天我们来搭建一个完整的物联网数据采集与可视化系统,利用 Python、MQTT 协议和 WebSocket 技术,实现从传感器到前端页面的实时数据监控。
我们将模拟一个温度传感器系统,涵盖以下流程:
- 模拟传感器数据生成
- 使用 MQTT 协议发布数据
- 通过 Python 服务订阅并处理数据
- 将处理后的数据通过 WebSocket 推送到前端
- 使用 HTML/CSS/JavaScript 实现可视化展示
系统架构
整个链路非常清晰:
[传感器] --> [MQTT Broker] --> [Python 服务] --> [WebSocket Server] --> [Web 浏览器]
环境准备
我们需要安装以下依赖:
- Python 3.x
paho-mqtt: MQTT 客户端库
websockets: WebSocket 服务端库
asyncio: 异步 I/O 库(Python 内置)
flask: Web 框架(用于提供静态文件)
matplotlib: 数据可视化库
numpy: 数值计算库
安装命令如下:
pip install paho-mqtt websockets flask matplotlib numpy
项目结构
为了保持代码整洁,我们采用如下目录结构:
iot_monitoring/
├── app.py # 主应用逻辑
├── mqtt_publisher.py # MQTT 发布者(模拟传感器)
├── requirements.txt # 依赖包列表
├── templates/
│ └── index.html # 前端页面
└── static/
└── script.js # 前端 JavaScript
1. MQTT 发布者(模拟传感器)
首先,我们需要一个'设备'来产生数据。创建 mqtt_publisher.py 文件:
import random
import time
import json
import paho.mqtt.client as mqtt
MQTT_BROKER =
MQTT_PORT =
TOPIC =
SENSOR_ID =
():
rc == :
()
:
()
():
client = mqtt.Client()
client.on_connect = on_connect
client.connect(MQTT_BROKER, MQTT_PORT, )
:
:
temperature = (random.uniform(, ), )
humidity = (random.uniform(, ), )
payload = {
: SENSOR_ID,
: temperature,
: humidity,
: time.time()
}
client.publish(TOPIC, json.dumps(payload))
()
time.sleep()
KeyboardInterrupt:
()
client.disconnect()
__name__ == :
simulate_sensor_data()
"localhost"
1883
"iot/sensors/temperature"
"temp_001"
def
on_connect
client, userdata, flags, rc
if
0
print
"Connected to MQTT Broker!"
else
print
f"Failed to connect, return code {rc}"
def
simulate_sensor_data
60
try
while
True
round
18.0
30.0
2
round
40.0
80.0
2
"sensor_id"
"temperature"
"humidity"
"timestamp"
print
f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Published: {payload}"
5
except
print
"\nStopping publisher..."
if
"__main__"
这里的关键是构建符合预期的 JSON 负载,确保时间戳为 Unix 时间戳,方便后续解析。
2. 主应用逻辑
这是系统的核心。我们需要同时运行 Flask 服务器、WebSocket 服务和 MQTT 客户端。由于 MQTT 回调是同步的,而 WebSocket 是异步的,直接混用会导致线程安全问题。因此,我们引入队列来解耦这两个部分。
import asyncio
import websockets
import json
import threading
import time
import queue
import paho.mqtt.client as mqtt
from flask import Flask, render_template
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
latest_data = {}
data_queue = queue.Queue()
connected_clients = set()
lock = threading.Lock()
MQTT_BROKER = "localhost"
MQTT_PORT = 1883
MQTT_TOPIC = "iot/sensors/temperature"
app = Flask(__name__)
@app.route('/')
def index():
return render_template('index.html')
async def handle_websocket(websocket, path):
logger.info(f"WebSocket client connected from {websocket.remote_address}")
with lock:
connected_clients.add(websocket)
try:
async for message in websocket:
pass
except websockets.exceptions.ConnectionClosed:
logger.info(f"WebSocket client disconnected from {websocket.remote_address}")
finally:
with lock:
connected_clients.discard(websocket)
async def broadcast_data(data):
"""向所有连接的 WebSocket 客户端广播数据"""
if connected_clients:
data_to_send = json.dumps(data)
tasks = []
with lock:
clients_copy = list(connected_clients)
for client in clients_copy:
tasks.append(client.send(data_to_send))
await asyncio.gather(*tasks, return_exceptions=True)
def on_message(client, userdata, msg):
"""处理从 MQTT 接收到的消息"""
try:
data = json.loads(msg.payload.decode())
global latest_data
latest_data = data
logger.info(f"Received MQTT message: {data}")
data_queue.put(data)
except Exception as e:
logger.error(f"Error processing MQTT message: {e}")
def start_mqtt_client():
"""启动 MQTT 客户端"""
client = mqtt.Client()
client.on_message = on_message
client.connect(MQTT_BROKER, MQTT_PORT, 60)
client.subscribe(MQTT_TOPIC)
logger.info("MQTT Client started and subscribed to topic.")
client.loop_forever()
async def main():
"""主异步函数"""
def run_flask():
app.run(host='0.0.0.0', port=5000, debug=False)
flask_thread = threading.Thread(target=run_flask)
flask_thread.daemon = True
flask_thread.start()
mqtt_thread = threading.Thread(target=start_mqtt_client)
mqtt_thread.daemon = True
mqtt_thread.start()
server = await websockets.serve(handle_websocket, "0.0.0.0", 8765)
logger.info("WebSocket server started on ws://0.0.0.0:8765")
async def process_queue():
while True:
try:
data = data_queue.get_nowait()
await broadcast_data(data)
except queue.Empty:
await asyncio.sleep(0.1)
asyncio.create_task(process_queue())
await server.wait_closed()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("Shutting down...")
注意这里使用了 queue.Queue 和锁机制。这是因为 MQTT 回调在独立线程中运行,而 WebSocket 操作必须在事件循环的主线程中进行。这种设计避免了竞态条件,让代码更健壮。
3. 前端页面
创建一个简单的仪表盘。创建 templates/index.html 文件:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>IoT Temperature Monitor</title>
<script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
<style>
body { font-family: Arial, sans-serif; margin: 20px; background-color: #f5f5f5; }
.container { max-width: 1200px; margin: auto; background-color: white; padding: 20px; border-radius: 8px; box-shadow: 0 2px 10px rgba(0,0,0,0.1); }
h1 { color: #333; text-align: center; }
.data-display { display: flex; justify-content: space-around; margin-bottom: 20px; flex-wrap: wrap; }
.sensor-card { background-color: #e9ecef; padding: 15px; border-radius: 5px; margin: 10px; min-width: 200px; text-align: center; }
.sensor-value { font-size: 2em; font-weight: bold; }
.chart-container { width: 100%; height: 400px; margin-top: 20px; }
.status { text-align: center; margin-top: 10px; font-weight: bold; }
.status.connected { color: green; }
.status.disconnected { color: red; }
</style>
</head>
<body>
<div class="container">
<h1>IoT Temperature & Humidity Monitor</h1>
<div id="status" class="status disconnected">Connecting...</div>
<div class="data-display">
<div class="sensor-card">
<h3>Temperature</h3>
<div id="temperature" class="sensor-value">-- °C</div>
</div>
<div class="sensor-card">
<h3>Humidity</h3>
<div id="humidity" class="sensor-value">-- %</div>
</div>
<div class="sensor-card">
<h3>Last Update</h3>
<div id="timestamp" class="sensor-value">--</div>
</div>
</div>
<div class="chart-container">
<canvas id="sensorChart"></canvas>
</div>
</div>
<script src="/static/script.js"></script>
</body>
</html>
4. 前端 JavaScript
负责连接 WebSocket 并更新图表。创建 static/script.js 文件:
const wsUri = "ws://localhost:8765";
let websocket;
let chart;
document.addEventListener('DOMContentLoaded', function() {
const ctx = document.getElementById('sensorChart').getContext('2d');
chart = new Chart(ctx, {
type: 'line',
data: {
labels: [],
datasets: [
{
label: 'Temperature (°C)',
data: [],
borderColor: 'rgb(255, 99, 132)',
backgroundColor: 'rgba(255, 99, 132, 0.2)',
tension: 0.1,
fill: false
},
{
label: 'Humidity (%)',
data: [],
borderColor: 'rgb(54, 162, 235)',
backgroundColor: 'rgba(54, 162, 235, 0.2)',
tension: 0.1,
fill: false
}
]
},
options: {
responsive: true,
maintainAspectRatio: false,
scales: {
x: { title: { display: true, text: 'Time' } },
y: { beginAtZero: false }
},
plugins: {
legend: { position: 'top' },
title: { display: true, text: 'Real-time Sensor Data' }
}
}
});
connectWebSocket();
});
function connectWebSocket() {
console.log("Attempting to connect to WebSocket server...");
websocket = new WebSocket(wsUri);
websocket.onopen = function(event) {
console.log("WebSocket connection opened.");
document.getElementById('status').textContent = "Connected";
document.getElementById('status').className = "status connected";
};
websocket.onclose = function(event) {
console.log("WebSocket connection closed.");
document.getElementById('status').textContent = "Disconnected";
document.getElementById('status').className = "status disconnected";
setTimeout(connectWebSocket, 5000);
};
websocket.onerror = function(error) {
console.error("WebSocket error:", error);
document.getElementById('status').textContent = "Connection Error";
document.getElementById('status').className = "status disconnected";
};
websocket.onmessage = function(event) {
try {
const data = JSON.parse(event.data);
console.log("Received data:", data);
document.getElementById('temperature').textContent = data.temperature.toFixed(2) + ' °C';
document.getElementById('humidity').textContent = data.humidity.toFixed(2) + ' %';
const date = new Date(data.timestamp * 1000);
document.getElementById('timestamp').textContent = date.toLocaleString();
updateChart(data);
} catch (e) {
console.error("Error parsing JSON:", e);
}
};
}
function updateChart(data) {
const timestamp = new Date(data.timestamp * 1000).toLocaleTimeString();
const temp = data.temperature;
const humidity = data.humidity;
chart.data.labels.push(timestamp);
chart.data.datasets[0].data.push(temp);
chart.data.datasets[1].data.push(humidity);
if (chart.data.labels.length > 20) {
chart.data.labels.shift();
chart.data.datasets[0].data.shift();
chart.data.datasets[1].data.shift();
}
chart.update();
}
5. 运行项目
步骤 1:启动 MQTT Broker
你需要先有一个 MQTT 代理运行。最简单的方法是使用 Docker 运行 Mosquitto:
docker run -d --name mosquitto -p 1883:1883 -p 9001:9001 --restart unless-stopped eclipse-mosquitto
或者,如果你在本地安装了 Mosquitto,请确保服务已启动。
步骤 2:启动主应用
- Flask Web 服务器 (监听端口 5000)
- WebSocket 服务器 (监听端口 8765)
- MQTT 客户端 (订阅主题
iot/sensors/temperature)
步骤 3:启动 MQTT 发布者
python3 mqtt_publisher.py
这个脚本会每隔 5 秒向 MQTT 主题 iot/sensors/temperature 发布一条模拟传感器数据。
步骤 4:访问前端页面
打开浏览器访问 http://localhost:5000。
- 实时更新的温度和湿度数值
- 最近一次更新的时间
- 一个实时折线图,显示温度和湿度随时间的变化
总结
这个项目展示了如何构建一个基本的物联网数据采集与可视化系统。我们使用了 MQTT 进行设备间通信,通过 Python 服务处理数据,并使用 WebSocket 实现实时推送,最后通过前端页面进行可视化展示。
- 可以轻松添加更多的传感器类型
- 可以集成更复杂的后端处理逻辑
- 可以使用不同的可视化库或框架
- 可以将数据存储到数据库中进行历史查询
通过这种方式,你可以快速搭建一个可用于监控和分析 IoT 数据的原型系统。
相关免费在线工具
- curl 转代码
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
- Base64 字符串编码/解码
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
- Base64 文件转换器
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
- Markdown转HTML
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
- HTML转Markdown
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online
- JSON 压缩
通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online