物联网数据采集与可视化:使用 Python 和 MQTT 构建实时监控系统
物联网数据采集与可视化:使用 Python 和 MQTT 构建实时监控系统
引言
物联网(IoT)的核心在于连接各种设备并收集其产生的数据。然而,仅仅收集数据是不够的,我们需要能够实时地处理、分析和可视化这些数据,以便做出及时的决策。本文将带你构建一个完整的物联网数据采集与可视化系统,使用 Python、MQTT 协议和 WebSocket 技术,实现实时数据监控。
我们将创建一个模拟的温度传感器系统,该系统会:
- 模拟传感器数据生成
- 使用 MQTT 协议发布数据
- 通过 Python 服务订阅并处理数据
- 将处理后的数据通过 WebSocket 发送到前端
- 使用 HTML/CSS/JavaScript 实现前端可视化
系统架构
[传感器] --> [MQTT Broker] --> [Python 服务] --> [WebSocket Server] --> [Web 浏览器] 所需软件和库
- Python 3.x
paho-mqtt: MQTT 客户端库websockets: WebSocket 服务端库asyncio: 异步 I/O 库flask: Web 框架(用于提供静态文件)matplotlib: 数据可视化库numpy: 数值计算库
项目结构
iot_monitoring/ ├── app.py # 主应用逻辑 ├── mqtt_publisher.py # MQTT 发布者(模拟传感器) ├── requirements.txt # 依赖包列表 ├── templates/ │ └── index.html # 前端页面 └── static/ └── script.js # 前端 JavaScript 安装依赖
pip install paho-mqtt websockets flask matplotlib numpy asyncio 1. MQTT 发布者(模拟传感器)
创建 mqtt_publisher.py 文件:
# mqtt_publisher.pyimport random import time import json import paho.mqtt.client as mqtt # MQTT 配置 MQTT_BROKER ="localhost" MQTT_PORT =1883 TOPIC ="iot/sensors/temperature"# 模拟传感器 ID SENSOR_ID ="temp_001"defon_connect(client, userdata, flags, rc):if rc ==0:print("Connected to MQTT Broker!")else:print("Failed to connect, return code %d\n", rc)defsimulate_sensor_data(): client = mqtt.Client() client.on_connect = on_connect # 连接到 MQTT 代理 client.connect(MQTT_BROKER, MQTT_PORT,60)# 开始循环发送数据try:whileTrue:# 模拟温度数据 (范围: 18°C - 30°C) temperature =round(random.uniform(18.0,30.0),2)# 模拟湿度数据 (范围: 40% - 80%) humidity =round(random.uniform(40.0,80.0),2) payload ={"sensor_id": SENSOR_ID,"temperature": temperature,"humidity": humidity,"timestamp": time.time()}# 发布消息 client.publish(TOPIC, json.dumps(payload))print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Published: {payload}")# 每 5 秒发送一次数据 time.sleep(5)except KeyboardInterrupt:print("\nStopping publisher...") client.disconnect()if __name__ =="__main__": simulate_sensor_data()2. 主应用逻辑
创建 app.py 文件:
# app.pyimport asyncio import websockets import json import threading import time import paho.mqtt.client as mqtt from flask import Flask, render_template import logging # 设置日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__)# 全局变量用于存储最新数据和 WebSocket 连接 latest_data ={} connected_clients =set()# MQTT 配置 MQTT_BROKER ="localhost" MQTT_PORT =1883 MQTT_TOPIC ="iot/sensors/temperature"# Flask 应用 app = Flask(__name__)@app.route('/')defindex():return render_template('index.html')# WebSocket 服务器asyncdefhandle_websocket(websocket, path):""" 处理 WebSocket 连接 """ logger.info(f"WebSocket client connected from {websocket.remote_address}")# 将新客户端加入集合 connected_clients.add(websocket)try:asyncfor message in websocket:# 可以在这里处理来自客户端的消息passexcept websockets.exceptions.ConnectionClosed: logger.info(f"WebSocket client disconnected from {websocket.remote_address}")finally:# 移除断开的客户端 connected_clients.discard(websocket)asyncdefbroadcast_data(data):""" 向所有连接的 WebSocket 客户端广播数据 """if connected_clients:# 如果有客户端连接# 准备要发送的数据 data_to_send = json.dumps(data)# 并发地向所有客户端发送await asyncio.gather(*[client.send(data_to_send)for client in connected_clients], return_exceptions=True)# MQTT 回调函数defon_message(client, userdata, msg):""" 处理从 MQTT 接收到的消息 """try: data = json.loads(msg.payload.decode())global latest_data latest_data = data logger.info(f"Received MQTT message: {data}")# 将数据广播给所有 WebSocket 客户端 asyncio.run(broadcast_data(data))except Exception as e: logger.error(f"Error processing MQTT message: {e}")defstart_mqtt_client():""" 启动 MQTT 客户端 """ client = mqtt.Client() client.on_message = on_message client.connect(MQTT_BROKER, MQTT_PORT,60) client.subscribe(MQTT_TOPIC) logger.info("MQTT Client started and subscribed to topic.") client.loop_forever()# 持续监听消息asyncdefmain():""" 主异步函数 """# 启动 Flask 应用 (在单独线程中)defrun_flask(): app.run(host='0.0.0.0', port=5000, debug=False) flask_thread = threading.Thread(target=run_flask) flask_thread.daemon =True flask_thread.start()# 启动 MQTT 客户端 (在单独线程中) mqtt_thread = threading.Thread(target=start_mqtt_client) mqtt_thread.daemon =True mqtt_thread.start()# 启动 WebSocket 服务器 server =await websockets.serve(handle_websocket,"0.0.0.0",8765) logger.info("WebSocket server started on ws://0.0.0.0:8765")await server.wait_closed()if __name__ =="__main__":try: asyncio.run(main())except KeyboardInterrupt: logger.info("Shutting down...")3. 前端页面
创建 templates/index.html 文件:
<!-- templates/index.html --><!DOCTYPEhtml><htmllang="en"><head><metacharset="UTF-8"><title>IoT Temperature Monitor</title><scriptsrc="https://cdn.jsdelivr.net/npm/chart.js"></script><style>body{font-family: Arial, sans-serif;margin: 20px;background-color: #f5f5f5;}.container{max-width: 1200px;margin: auto;background-color: white;padding: 20px;border-radius: 8px;box-shadow: 0 2px 10px rgba(0,0,0,0.1);}h1{color: #333;text-align: center;}.data-display{display: flex;justify-content: space-around;margin-bottom: 20px;flex-wrap: wrap;}.sensor-card{background-color: #e9ecef;padding: 15px;border-radius: 5px;margin: 10px;min-width: 200px;text-align: center;}.sensor-value{font-size: 2em;font-weight: bold;}.chart-container{width: 100%;height: 400px;margin-top: 20px;}.status{text-align: center;margin-top: 10px;font-weight: bold;}.status.connected{color: green;}.status.disconnected{color: red;}</style></head><body><divclass="container"><h1>IoT Temperature & Humidity Monitor</h1><divid="status"class="status disconnected">Connecting...</div><divclass="data-display"><divclass="sensor-card"><h3>Temperature</h3><divid="temperature"class="sensor-value">-- °C</div></div><divclass="sensor-card"><h3>Humidity</h3><divid="humidity"class="sensor-value">-- %</div></div><divclass="sensor-card"><h3>Last Update</h3><divid="timestamp"class="sensor-value">--</div></div></div><divclass="chart-container"><canvasid="sensorChart"></canvas></div></div><scriptsrc="/static/script.js"></script></body></html>4. 前端 JavaScript
创建 static/script.js 文件:
// static/script.jsconst wsUri ="ws://localhost:8765";let websocket;let chart; document.addEventListener('DOMContentLoaded',function(){const ctx = document.getElementById('sensorChart').getContext('2d'); chart =newChart(ctx,{type:'line',data:{labels:[],// 时间标签datasets:[{label:'Temperature (°C)',data:[],borderColor:'rgb(255, 99, 132)',backgroundColor:'rgba(255, 99, 132, 0.2)',tension:0.1,fill:false},{label:'Humidity (%)',data:[],borderColor:'rgb(54, 162, 235)',backgroundColor:'rgba(54, 162, 235, 0.2)',tension:0.1,fill:false}]},options:{responsive:true,maintainAspectRatio:false,scales:{x:{title:{display:true,text:'Time'}},y:{beginAtZero:false}},plugins:{legend:{position:'top',},title:{display:true,text:'Real-time Sensor Data'}}}});connectWebSocket();});functionconnectWebSocket(){ console.log("Attempting to connect to WebSocket server..."); websocket =newWebSocket(wsUri); websocket.onopen=function(event){ console.log("WebSocket connection opened."); document.getElementById('status').textContent ="Connected"; document.getElementById('status').className ="status connected";}; websocket.onclose=function(event){ console.log("WebSocket connection closed."); document.getElementById('status').textContent ="Disconnected"; document.getElementById('status').className ="status disconnected";// 尝试重新连接setTimeout(connectWebSocket,5000);}; websocket.onerror=function(error){ console.error("WebSocket error:", error); document.getElementById('status').textContent ="Connection Error"; document.getElementById('status').className ="status disconnected";}; websocket.onmessage=function(event){try{const data =JSON.parse(event.data); console.log("Received data:", data);// 更新 DOM 元素 document.getElementById('temperature').textContent = data.temperature.toFixed(2)+' °C'; document.getElementById('humidity').textContent = data.humidity.toFixed(2)+' %';// 显示人类可读的时间const date =newDate(data.timestamp *1000); document.getElementById('timestamp').textContent = date.toLocaleString();// 更新图表updateChart(data);}catch(e){ console.error("Error parsing JSON:", e);}};}functionupdateChart(data){const timestamp =newDate(data.timestamp *1000).toLocaleTimeString();const temp = data.temperature;const humidity = data.humidity;// 添加新数据点 chart.data.labels.push(timestamp); chart.data.datasets[0].data.push(temp);// 温度 chart.data.datasets[1].data.push(humidity);// 湿度// 限制图表上的数据点数量(例如,保留最近的 20 个)if(chart.data.labels.length >20){ chart.data.labels.shift(); chart.data.datasets[0].data.shift(); chart.data.datasets[1].data.shift();}// 更新图表 chart.update();}5. 运行项目
步骤 1: 启动 MQTT Broker
你需要先有一个 MQTT 代理运行。最简单的方法是使用 Docker 运行 Mosquitto:
docker run -d \ --name mosquitto \ -p 1883:1883 \ -p 9001:9001 \ --restart unless-stopped \ eclipse-mosquitto 或者,如果你在本地安装了 Mosquitto,请确保服务已启动。
步骤 2: 启动主应用
在终端中运行:
python3 app.py 这将启动:
- Flask Web 服务器 (监听端口 5000)
- WebSocket 服务器 (监听端口 8765)
- MQTT 客户端 (订阅主题
iot/sensors/temperature)
步骤 3: 启动 MQTT 发布者
在另一个终端中运行:
python3 mqtt_publisher.py 这个脚本会每隔 5 秒向 MQTT 主题 iot/sensors/temperature 发布一条模拟传感器数据。
步骤 4: 访问前端页面
打开浏览器访问 http://localhost:5000。
你应该能看到:
- 实时更新的温度和湿度数值
- 最近一次更新的时间
- 一个实时折线图,显示温度和湿度随时间的变化
总结
这个项目展示了如何构建一个基本的物联网数据采集与可视化系统。我们使用了 MQTT 进行设备间通信,通过 Python 服务处理数据,并使用 WebSocket 实现实时推送,最后通过前端页面进行可视化展示。
这个系统具有良好的扩展性:
- 可以轻松添加更多的传感器类型
- 可以集成更复杂的后端处理逻辑
- 可以使用不同的可视化库或框架
- 可以将数据存储到数据库中进行历史查询
通过这种方式,你可以快速搭建一个可用于监控和分析 IoT 数据的原型系统。