跳到主要内容MQTT 通信协议 Mosquitto 发布订阅 C 语言实现示例 | 极客日志C
MQTT 通信协议 Mosquitto 发布订阅 C 语言实现示例
基于 Mosquitto 库的 C 语言 MQTT 客户端开发指南。涵盖同步与异步通信模式,展示发布订阅核心回调机制及线程安全处理。提供完整代码示例与编译配置,帮助开发者快速构建稳定可靠的物联网消息服务。
FrontendX0 浏览 MQTT 客户端开发:基于 Mosquitto 的 C 语言实践
在物联网开发中,MQTT 协议因其轻量级和发布/订阅模式而被广泛采用。Mosquitto 作为成熟的开源 MQTT 代理库,提供了丰富的 C API。本文将通过实际代码演示如何使用 libmosquitto 库实现同步与异步的发布订阅功能。
准备工作
编译安装 mosquitto 后,主要需要用到以下文件:
libmosquitto.so.1 (动态链接库)
mosquitto.h (头文件)
下面的示例均使用标准 C 语言编写,只需包含头文件并链接 -lmosquitto 即可。
同步模式
同步模式下,程序会阻塞等待网络事件。这种方式逻辑简单,适合对实时性要求不高或单线程场景。
订阅端 (sub.c)
订阅端主要负责连接服务器、订阅主题并监听消息回调。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "mosquitto.h"
#define HOST "localhost"
#define PORT 1883
#define KEEP_ALIVE 60
#define MSG_MAX_SIZE 512
static int running = 1;
void my_connect_callback(struct mosquitto *mosq, void *obj, int rc) {
printf("Call the function: on_connect\n");
if(rc) {
printf("on_connect error!\n");
exit(1);
} else {
(mosquitto_subscribe(mosq, , , )) {
();
();
}
}
}
{
();
running = ;
}
{
();
}
{
();
(, ( *)msg->topic, ( *)msg->payload);
( == (msg->payload, )) {
mosquitto_disconnect(mosq);
}
}
{
ret;
ret = mosquitto_lib_init();
(ret) {
();
;
}
mosq = mosquitto_new(, , );
(mosq == ) {
();
mosquitto_lib_cleanup();
;
}
mosquitto_connect_callback_set(mosq, my_connect_callback);
mosquitto_disconnect_callback_set(mosq, my_disconnect_callback);
mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);
mosquitto_message_callback_set(mosq, my_message_callback);
ret = mosquitto_connect(mosq, HOST, PORT, KEEP_ALIVE);
(ret) {
();
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
;
}
();
(running) {
mosquitto_loop(mosq, , );
}
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
();
;
}
if
NULL
"topic1"
2
printf
"Set the topic error!\n"
exit
1
void
my_disconnect_callback
(struct mosquitto *mosq, void *obj, int rc)
printf
"Call the function: my_disconnect_callback\n"
0
void
my_subscribe_callback
(struct mosquitto *mosq, void *obj, int mid, int qos_count, const int *granted_qos)
printf
"Call the function: on_subscribe\n"
void
my_message_callback
(struct mosquitto *mosq, void *obj, const struct mosquitto_message *msg)
printf
"Call the function: on_message\n"
printf
"Receive a message of %s : %s\n"
char
char
if
0
strcmp
"quit"
int
main
()
int
struct mosquitto *mosq;
if
printf
"Init lib error!\n"
return
-1
"sub_test"
true
NULL
if
NULL
printf
"New sub_test error!\n"
return
-1
if
printf
"Connect server error!\n"
return
-1
printf
"Start!\n"
while
-1
1
printf
"End!\n"
return
0
发布端 (pub.c)
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "mosquitto.h"
#define HOST "localhost"
#define PORT 1883
#define KEEP_ALIVE 60
#define MSG_MAX_SIZE 512
static int running = 1;
void my_connect_callback(struct mosquitto *mosq, void *obj, int rc) {
printf("Call the function: my_connect_callback\n");
}
void my_disconnect_callback(struct mosquitto *mosq, void *obj, int rc) {
printf("Call the function: my_disconnect_callback\n");
running = 0;
}
void my_publish_callback(struct mosquitto *mosq, void *obj, int mid) {
printf("Call the function: my_publish_callback\n");
}
int main() {
int ret;
struct mosquitto *mosq;
char buff[MSG_MAX_SIZE];
ret = mosquitto_lib_init();
if(ret) {
printf("Init lib error!\n");
return -1;
}
mosq = mosquitto_new("pub_test", true, NULL);
if(mosq == NULL) {
printf("New pub_test error!\n");
mosquitto_lib_cleanup();
return -1;
}
mosquitto_connect_callback_set(mosq, my_connect_callback);
mosquitto_disconnect_callback_set(mosq, my_disconnect_callback);
mosquitto_publish_callback_set(mosq, my_publish_callback);
ret = mosquitto_connect(mosq, HOST, PORT, KEEP_ALIVE);
if(ret) {
printf("Connect server error!\n");
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
return -1;
}
printf("Start!\n");
int loop = mosquitto_loop_start(mosq);
if(loop != MOSQ_ERR_SUCCESS) {
printf("mosquitto loop error\n");
return 1;
}
while(fgets(buff, MSG_MAX_SIZE, stdin) != NULL) {
mosquitto_publish(mosq, NULL, "topic1", strlen(buff)+1, buff, 0, 0);
memset(buff, 0, sizeof(buff));
}
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
printf("End!\n");
return 0;
}
编译配置
all:
@echo "Start compiling..."
gcc -o sub sub.c -lmosquitto
gcc -o pub pub.c -lmosquitto
@echo "end"
sub:
gcc -o sub sub.c -lmosquitto
pub:
gcc -o pub pub.c -lmosquitto
clean:
-rm sub pub
异步模式
异步模式利用多线程处理网络 IO,主线程不会被阻塞,更适合高并发或需要与其他业务逻辑交互的场景。
- 同步:
mosquitto_connect + mosquitto_loop (阻塞)
- 异步:
mosquitto_connect_async + mosquitto_loop_start (创建内部线程)
订阅端 (sub.c)
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "mosquitto.h"
#define HOST "localhost"
#define PORT 1883
#define KEEP_ALIVE 60
#define MSG_MAX_SIZE 512
static int running = 1;
void my_connect_callback(struct mosquitto *mosq, void *obj, int rc) {
printf("Call the function: on_connect\n");
if(rc) {
printf("on_connect error!\n");
exit(1);
} else {
if(mosquitto_subscribe(mosq, NULL, "topic2", 2)) {
printf("Set the topic error!\n");
exit(1);
}
}
}
void my_disconnect_callback(struct mosquitto *mosq, void *obj, int rc) {
printf("Call the function: my_disconnect_callback\n");
running = 0;
}
void my_subscribe_callback(struct mosquitto *mosq, void *obj, int mid, int qos_count, const int *granted_qos) {
printf("Call the function: on_subscribe\n");
}
void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *msg) {
printf("Call the function: on_message\n");
printf("Receive a message of %s : %s\n", (char *)msg->topic, (char *)msg->payload);
if(0 == strcmp(msg->payload, "quit")) {
mosquitto_disconnect(mosq);
}
}
int main() {
int ret;
struct mosquitto *mosq;
ret = mosquitto_lib_init();
if(ret) {
printf("Init lib error!\n");
return -1;
}
mosq = mosquitto_new("sub_test", true, NULL);
if(mosq == NULL) {
printf("New sub_test error!\n");
mosquitto_lib_cleanup();
return -1;
}
mosquitto_connect_callback_set(mosq, my_connect_callback);
mosquitto_disconnect_callback_set(mosq, my_disconnect_callback);
mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);
mosquitto_message_callback_set(mosq, my_message_callback);
ret = mosquitto_connect_async(mosq, HOST, PORT, KEEP_ALIVE);
if(ret) {
printf("Connect server error!\n");
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
return -1;
}
ret = mosquitto_loop_start(mosq);
if(ret) {
printf("Start loop error!\n");
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
return -1;
}
printf("Start!\n");
while(running) {
sleep(1);
}
mosquitto_loop_stop(mosq, false);
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
printf("End!\n");
return 0;
}
发布端 (pub.c)
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "mosquitto.h"
#define HOST "localhost"
#define PORT 1883
#define KEEP_ALIVE 60
#define MSG_MAX_SIZE 512
static int running = 1;
void my_connect_callback(struct mosquitto *mosq, void *obj, int rc) {
printf("Call the function: my_connect_callback\n");
}
void my_disconnect_callback(struct mosquitto *mosq, void *obj, int rc) {
printf("Call the function: my_disconnect_callback\n");
running = 0;
}
void my_publish_callback(struct mosquitto *mosq, void *obj, int mid) {
printf("Call the function: my_publish_callback\n");
}
int main() {
int ret;
struct mosquitto *mosq;
char buff[MSG_MAX_SIZE];
ret = mosquitto_lib_init();
if(ret) {
printf("Init lib error!\n");
return -1;
}
mosq = mosquitto_new("pub_test", true, NULL);
if(mosq == NULL) {
printf("New pub_test error!\n");
mosquitto_lib_cleanup();
return -1;
}
mosquitto_connect_callback_set(mosq, my_connect_callback);
mosquitto_disconnect_callback_set(mosq, my_disconnect_callback);
mosquitto_publish_callback_set(mosq, my_publish_callback);
ret = mosquitto_connect_async(mosq, HOST, PORT, KEEP_ALIVE);
if(ret) {
printf("Connect server error!\n");
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
return -1;
}
int loop = mosquitto_loop_start(mosq);
if(loop != MOSQ_ERR_SUCCESS) {
printf("mosquitto loop error\n");
return 1;
}
printf("Start!\n");
while(fgets(buff, MSG_MAX_SIZE, stdin) != NULL) {
mosquitto_publish(mosq, NULL, "topic2", strlen(buff)+1, buff, 0, 0);
memset(buff, 0, sizeof(buff));
}
mosquitto_loop_stop(mosq, false);
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
printf("End!\n");
return 0;
}
可订阅可发布模式
在实际场景中,客户端可能既需要发布也需要订阅。下面展示一个双向收发的示例。
客户端代码 (pub_sub.c)
注意此处使用了 <stdbool.h> 来支持布尔类型。
#include <stdio.h>
#include <stdlib.h>
#include <mosquitto.h>
#include <string.h>
#include <stdbool.h>
#define HOST "localhost"
#define PORT 1883
#define KEEP_ALIVE 60
#define MSG_MAX_SIZE 512
bool session = true;
void my_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message) {
if(message->payloadlen) {
printf("%s %s", message->topic, (char *)message->payload);
} else {
printf("%s (null)\n", message->topic);
}
fflush(stdout);
}
void my_connect_callback(struct mosquitto *mosq, void *userdata, int result) {
int i;
if(!result) {
mosquitto_subscribe(mosq, NULL, "topic2 ", 2);
} else {
fprintf(stderr, "Connect failed\n");
}
}
void my_subscribe_callback(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos) {
int i;
printf("Subscribed (mid: %d): %d", mid, granted_qos[0]);
for(i=1; i<qos_count; i++) {
printf(", %d", granted_qos[i]);
}
printf("\n");
}
void my_log_callback(struct mosquitto *mosq, void *userdata, int level, const char *str) {
printf("%s\n", str);
}
int main() {
struct mosquitto *mosq = NULL;
char buff[MSG_MAX_SIZE];
mosquitto_lib_init();
mosq = mosquitto_new(NULL, session, NULL);
if(!mosq) {
printf("create client failed..\n");
mosquitto_lib_cleanup();
return 1;
}
mosquitto_connect_callback_set(mosq, my_connect_callback);
mosquitto_message_callback_set(mosq, my_message_callback);
mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);
if(mosquitto_connect(mosq, HOST, PORT, KEEP_ALIVE)) {
fprintf(stderr, "Unable to connect.\n");
return 1;
}
int loop = mosquitto_loop_start(mosq);
if(loop != MOSQ_ERR_SUCCESS) {
printf("mosquitto loop error\n");
return 1;
}
while(fgets(buff, MSG_MAX_SIZE, stdin) != NULL) {
mosquitto_publish(mosq, NULL, "topic1 ", strlen(buff)+1, buff, 0, 0);
memset(buff, 0, sizeof(buff));
}
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
return 0;
}
运行此客户端时,它既能接收 topic2 的消息,也能向 topic1 发送消息,实现了双向通信。
相关免费在线工具
- Base64 字符串编码/解码
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
- Base64 文件转换器
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
- Markdown转HTML
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
- HTML转Markdown
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online
- JSON 压缩
通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online
- JSON美化和格式化
将JSON字符串修饰为友好的可读格式。 在线工具,JSON美化和格式化在线工具,online