C++代码建立mqtt客户端发送数据和接受mqtt消息
首先,声明一下,因为懒的原因,将mqtt的小文章搁置了两个月,现在终于要补上了!哈哈
一.什么是mqtt以及用途和优点
MQTT(Message Queuing Telemetry Transport)是一种轻量级、基于发布-订阅模式的消息传输协议,适用于资源受限的设备和低带宽、高延迟或不稳定的网络环境。它在物联网应用中广受欢迎,能够实现传感器、执行器和其它设备之间的高效通信。
MQTT用于收发消息的主要组件有:
- Publisher(发布者)
- Broker(代理)
- Subscriber(订阅者)
三者的关系如下图:
从图中可以看出,如果把发送消息或接收消息的端成为Client,那么一个Client即可以是发布者(Publisher)也可以是订阅者(Subscriber)。因为一个client可以通过实现publish接口发送消息,也可以用过实现subscribe接口订阅消息。
代理(Broker)是链接发布者(Publisher)和订阅者(Subscriber)的中心。一个代理可以链接上千个Client。代理的职责是接收所有发布者(Publisher)的消息,判断消息应该发往哪个订阅者(Subscriber)
详细介绍移步至:MQTT 协议入门:基础知识和快速教程 | EMQ (emqx.com)
?二.环境搭建
涉及到3个,一个是客户端工具,一个是服务端,最后是客户端发布订阅代码实现,用于发布和订阅。
一.mqtt客户端工具及
mqtt客户端工具可以不用代码发送你想要发送的数据到各个订阅端(mqtt发布订阅等参照上面的链接),也可以用于测试写出的代码是否可以正常发布数据(客户端工具相当于一个)。
下载方式请自行搜索
二 服务端
不管是订阅还是发送都是一个client,而服务则是一个BROKER(代理)。
下载链接 :Download EMQX
GitHub - emqx/qmqtt: MQTT client for Qt
三 发布订阅代码实现
这里只实现发布的C++代码。代码编写的逻辑是:写一个mqtt发布端,在链接到代理之后开始发布缓存中的数据,这里通过定时器每秒发送数据给到代理,如果某个订阅者订阅到消息则会触发回调函数,以显示订阅者收到了数据。
main.cpp:
#include "mqttclient.hpp"
#include <Qapplication>
#include <mutex>
/*
1.收到消息的结果回调
2.qtimer,到了时间自动publish
3.异步方式(生产者消费者方式)
3.满足用户不同方式使用客户端(多个构造)
*/
const QHostAddress EXAMPLE_HOST = QHostAddress("127.0.0.1");
const quint16 EXAMPLE_PORT = 1883;
const QString EXAMPLE_TOPIC = "mqtt/lmy";
#if 1
int main(int argc,char **argv)
{
QApplication app(argc, argv);
//QHostAddress ip(EXAMPLE_HOST);
std::shared_ptr<main_test>ptr;
mqtt_test mtest(EXAMPLE_HOST, EXAMPLE_PORT);
mtest.SetTopic(EXAMPLE_TOPIC);
mtest.SetCallback( ptr);
for (int i = 0; i < 100; ++i)
{
std::unique_lock<std::mutex>lock(mtest.m_mtx);
mtest.m_queue.push(i);
}
return app.exec();
}
#include "main.moc"
#endif
//#include <filesystem>
//namespace fs = std::filesystem;
//int main()
//{
// std::cout<<fs::current_path().generic_string();
// return 0;
//}
?mqtt 发布类代码实现:
#ifndef MQTTCLIENT_HPP
#define MQTTCLIENT_HPP
#include <iostream>
#include <Qdebug>
#include <QObject>
#include <qmqtt.h>
#include <queue>
#include <QTimer>
#include <mutex>
#include <memory>
using namespace QMQTT;
const QString EXAMPLE_NAME = "LMY";
class Oncallback
{
public:
virtual void Callback() = 0;
};
class main_test :public Oncallback
{
void Callback()
{
std::cout << "recive meassage ";
}
};
class mqtt_test :public QObject
{
Q_OBJECT
public:
QHostAddress m_ip;
quint16 m_port;
QString m_topic;
QMQTT::Client* m_client;
QTimer m_timer;
std::queue<int>m_queue;
std::mutex m_mtx;
std::shared_ptr<Oncallback> m_call;
public:
mqtt_test(QHostAddress ip, quint16 port = 1883)
{
m_client = new Client();
m_client->setHost(ip);
m_client->setPort(port);
//m_client->setHostName(EXAMPLE_NAME);
m_client->connectToHost();
connect(m_client, &Client::connected,this,
&mqtt_test::Onconnected);
connect(&m_timer, &QTimer::timeout, this,
&mqtt_test::Published);
connect(m_client, &Client::received, this,
&mqtt_test::Received);
connect(m_client, &Client::disconnected, this,
&mqtt_test::OnDisconnected);
}
~mqtt_test() { m_client->disconnectFromHost(); }
public slots:
void SetTopic(QString topic)
{
//m_port = PORT;
//m_ip = IP;
m_topic = topic;
m_client->subscribe(topic, 0);
}
bool SetCallback(std::shared_ptr<Oncallback>ptr)
{
m_call = ptr;
return true;
}
void Onconnected()
{
m_timer.start(1);
qDebug() << "connected:";
}
void Published()
{
int x;
{
std::unique_lock < std::mutex >lock(m_mtx);
if (!m_queue.empty())
{
x = m_queue.front();
if (x == 99)
{
std::cout << x;
}
m_queue.pop();
}
}
if (m_queue.empty())return;
QMQTT::Message message(x, m_topic,QString("publish number %1").arg(x).toUtf8());
m_client->publish(message);
qDebug()<< "PUBLISH:" ;
}
void OnDisconnected()
{
qDebug() << "disconnected:";
m_timer.stop();
}
void Received(const QMQTT::Message& message)
{
//开始回调
m_call->Callback();
qDebug() << "publish received: \"" << QString::fromUtf8(message.payload());
}
};
#endif // !MQTTCLIENT_HPP
?代理收到了发布者的消息并显示在界面上。注意订阅主题需要按规范且保持一致。
?这里mqtt客户类的构造中,使用信号槽方式来接受和处理信号。
??? ?mqtt_test(QHostAddress ip, quint16 port = 1883)
?? ?{
?? ??? ?m_client = new Client();
?? ??? ?m_client->setHost(ip);
?? ??? ?m_client->setPort(port);
?? ??? ?//m_client->setHostName(EXAMPLE_NAME);
?? ??? ?m_client->connectToHost();
?? ??? ?connect(m_client, &Client::connected,this,
?? ??? ??? ?&mqtt_test::Onconnected); // 一旦收到了链接成功的信号则会触发Onconnected()函数
?? ??? ?connect(&m_timer, &QTimer::timeout, this,
?? ??? ??? ?&mqtt_test::Published);
?? ??? ?
?? ??? ?connect(m_client, &Client::received, this,
?? ??? ??? ?&mqtt_test::Received);//这里没收到回复,则不调动槽函数执行Received().
?? ??? ?connect(m_client, &Client::disconnected, this,
?? ??? ??? ?&mqtt_test::OnDisconnected);
?? ?}
?需要注意的是,这里并没有用到集成mqtt类去实现(不会),而是直接使用qmqtt的头文件加DLL库的形式调动qmqtt。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!