1. 前言
JetLinks 是轻量级物联网开源平台,MQTT 协议作为其核心设备通信协议,是物联网教学与实操的重点内容。设备属性主动上报是物联网设备与平台交互的基础场景,本文聚焦教学场景,基于 Python 语言结合 paho-mqtt 库,实现适配 MQTT 3.1.1 协议的 JetLinks 设备属性周期性上报功能。通过明确上报主题、报文格式规范,以及 JetLinks 物模型配置要点,帮助学习者理解物联网设备侧 MQTT 通信的核心逻辑,同时解决协议版本兼容、连接稳定性、属性解析等实操中的常见问题,为后续物联网设备开发打下基础。
2. MQTT 主题及报文格式
报文格式
{"properties":{"temperature":25.5}}
主题格式
/{productId:产品 ID}/{deviceId:设备 ID}/properties/report
示例:/2017982230047920128/2017982371131723776/properties/report
3. JetLinks 配置
主要是配置产品的物模型
[图:JetLinks 物模型配置示例]
务必将 读 写 上报 都选上,否则会有问题
4. Python 实现
务必指明是 3.11 版本,5.0 版本应该会有问题,3.0 应该也没问题
import json
import logging
import time
import threading
from paho.mqtt import client as mqtt_client
from paho.mqtt.client import MQTTv311
import random
# ===================== 配置项 ======================
MQTT_BROKER = "192.168.147.134"
MQTT_PORT = 1883
MQTT_USERNAME = "admin"
MQTT_PASSWORD = "admin"
CLIENT_ID = "2017982371131723776"
# 主动上报配置(重点)
REPORT_TOPIC =
REPORT_INTERVAL =
DEFAULT_PROPERTIES = {
: ,
:
}
logging.basicConfig(
level=logging.INFO,
=
)
logger = logging.getLogger(__name__)
() -> mqtt_client.Client:
reconnect_count =
():
reconnect_count
reconnect_count =
rc_msg = {
: ,
: ,
: ,
: ,
: ,
: ,
}
rc == :
logger.info()
:
logger.error()
():
reconnect_count
reconnect_count +=
rc == :
logger.info()
:
disconnect_msg = {
: ,
: ,
: ,
: ,
: ,
: ,
}
logger.warning()
time.sleep((reconnect_count * , ))
client = mqtt_client.Client(
client_id=CLIENT_ID,
callback_api_version=mqtt_client.CallbackAPIVersion.VERSION1,
protocol=MQTTv311,
)
client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.auto_reconnect =
client.reconnect_delay_set(min_delay=, max_delay=)
:
client.connect(MQTT_BROKER, MQTT_PORT, keepalive=)
Exception e:
logger.error()
client
() -> :
property_keywords = [, , , ]
(keyword topic.lower() keyword property_keywords):
{}
:
payload_data = json.loads(payload)
payload_data.get(, payload_data)
json.JSONDecodeError:
:
properties = {}
item payload.split():
k, v = item.split(, )
v.replace(, ).isdigit():
v = (v) v (v)
properties[k.strip()] = v
properties
:
logger.error()
{}
():
topic = msg.topic
payload = msg.payload.decode(, errors=)
logger.info()
logger.info()
logger.info()
properties = parse_property_message(topic, payload)
properties:
logger.info()
(, , encoding=) f:
log_time = logging.Formatter().(logging.LogRecord(, , , , , (), ))
f.write()
:
logger.info()
():
client.subscribe(, qos=)
client.on_message = on_message
logger.info()
():
logger.info()
:
:
dynamic_properties = {
: (DEFAULT_PROPERTIES[] + random.uniform(-, ), )
}
report_payload = json.dumps({: dynamic_properties}, ensure_ascii=)
result = client.publish(REPORT_TOPIC, report_payload, qos=)
status = result[]
status == :
logger.info()
logger.info()
logger.info()
:
logger.error()
Exception e:
logger.error()
time.sleep(REPORT_INTERVAL)
():
logger.info()
client = connect_mqtt()
subscribe_all_topics(client)
publish_thread = threading.Thread(
target=publish_properties_periodically,
args=(client,),
daemon=
)
publish_thread.start()
client.loop_forever()
__name__ == :
:
run()
KeyboardInterrupt:
logger.info()
Exception e:
logger.error(, exc_info=)

