C++代码建立mqtt客户端发送数据和接受mqtt消息

2023-12-24 21:26:27

首先,声明一下,因为懒的原因,将mqtt的小文章搁置了两个月,现在终于要补上了!哈哈

一.什么是mqtt以及用途和优点

MQTT(Message Queuing Telemetry Transport)是一种轻量级、基于发布-订阅模式的消息传输协议,适用于资源受限的设备和低带宽、高延迟或不稳定的网络环境。它在物联网应用中广受欢迎,能够实现传感器、执行器和其它设备之间的高效通信。

MQTT用于收发消息的主要组件有:

  • Publisher(发布者)
  • Broker(代理)
  • Subscriber(订阅者)

三者的关系如下图:

从图中可以看出,如果把发送消息或接收消息的端成为Client,那么一个Client即可以是发布者(Publisher)也可以是订阅者(Subscriber)。因为一个client可以通过实现publish接口发送消息,也可以用过实现subscribe接口订阅消息。

代理(Broker)是链接发布者(Publisher)和订阅者(Subscriber)的中心。一个代理可以链接上千个Client。代理的职责是接收所有发布者(Publisher)的消息,判断消息应该发往哪个订阅者(Subscriber)

详细介绍移步至:MQTT 协议入门:基础知识和快速教程 | EMQ (emqx.com)

?二.环境搭建

涉及到3个,一个是客户端工具,一个是服务端,最后是客户端发布订阅代码实现,用于发布和订阅。

一.mqtt客户端工具及

mqtt客户端工具可以不用代码发送你想要发送的数据到各个订阅端(mqtt发布订阅等参照上面的链接),也可以用于测试写出的代码是否可以正常发布数据(客户端工具相当于一个)。

下载方式请自行搜索

二 服务端

不管是订阅还是发送都是一个client,而服务则是一个BROKER(代理)。

下载链接 :Download EMQX

GitHub - emqx/qmqtt: MQTT client for Qt

三 发布订阅代码实现

这里只实现发布的C++代码。代码编写的逻辑是:写一个mqtt发布端,在链接到代理之后开始发布缓存中的数据,这里通过定时器每秒发送数据给到代理,如果某个订阅者订阅到消息则会触发回调函数,以显示订阅者收到了数据。

main.cpp:

#include "mqttclient.hpp"
#include <Qapplication>
#include <mutex>
/*
1.收到消息的结果回调
2.qtimer,到了时间自动publish
3.异步方式(生产者消费者方式)
3.满足用户不同方式使用客户端(多个构造)
*/
const QHostAddress EXAMPLE_HOST = QHostAddress("127.0.0.1");
const quint16 EXAMPLE_PORT = 1883;
const QString EXAMPLE_TOPIC = "mqtt/lmy";
#if 1
int main(int argc,char **argv)
{
	QApplication app(argc, argv);
	//QHostAddress ip(EXAMPLE_HOST);
	std::shared_ptr<main_test>ptr;
	mqtt_test mtest(EXAMPLE_HOST, EXAMPLE_PORT);

	mtest.SetTopic(EXAMPLE_TOPIC);
	mtest.SetCallback( ptr);


	for (int i = 0; i < 100; ++i)
	{
		std::unique_lock<std::mutex>lock(mtest.m_mtx);
		mtest.m_queue.push(i);
	}

	return app.exec();
}
#include "main.moc"
#endif
//#include <filesystem>
//namespace fs = std::filesystem;
//int main()
//{
//	std::cout<<fs::current_path().generic_string();
//	return 0;
//}

?mqtt 发布类代码实现:

#ifndef MQTTCLIENT_HPP
#define MQTTCLIENT_HPP

#include <iostream>
#include <Qdebug>
#include <QObject>
#include <qmqtt.h>
#include <queue>
#include <QTimer>
#include <mutex>
#include <memory>
using namespace QMQTT;
const QString EXAMPLE_NAME = "LMY";

class Oncallback
{
public:
	virtual void Callback() = 0;
};

class main_test :public Oncallback
{
	void Callback()
	{
		std::cout << "recive meassage ";
	}
};


class mqtt_test :public QObject
{
	Q_OBJECT
public:
	QHostAddress m_ip;
	quint16 m_port;
	QString m_topic;
	QMQTT::Client* m_client;
	QTimer m_timer;
	std::queue<int>m_queue;
	std::mutex m_mtx;
	std::shared_ptr<Oncallback> m_call;
public:
	mqtt_test(QHostAddress ip, quint16 port = 1883)
	{
		m_client = new Client();
		m_client->setHost(ip);
		m_client->setPort(port);
		//m_client->setHostName(EXAMPLE_NAME);
		m_client->connectToHost();
		connect(m_client, &Client::connected,this,
			&mqtt_test::Onconnected);
		connect(&m_timer, &QTimer::timeout, this,
			&mqtt_test::Published);
		
		connect(m_client, &Client::received, this,
			&mqtt_test::Received);
		connect(m_client, &Client::disconnected, this,
			&mqtt_test::OnDisconnected);
	}
	~mqtt_test() { m_client->disconnectFromHost(); }


public slots:
	void SetTopic(QString topic)
	{
		//m_port = PORT;
		//m_ip = IP;
		m_topic = topic;
		m_client->subscribe(topic, 0);
	}
	
	bool SetCallback(std::shared_ptr<Oncallback>ptr)
	{
		m_call = ptr;
		return true;
	}

	void Onconnected()
	{
		m_timer.start(1);
		qDebug() << "connected:";
	}

	void Published()
	{
		int x;
		{
			std::unique_lock < std::mutex >lock(m_mtx);
			if (!m_queue.empty())
			{
				x = m_queue.front();
				if (x == 99) 
				{ 
					std::cout << x;
				}
				m_queue.pop();
			}
		}
		if (m_queue.empty())return;
		
		QMQTT::Message message(x, m_topic,QString("publish number %1").arg(x).toUtf8());
		m_client->publish(message);
		qDebug()<< "PUBLISH:" ;
	}

	void OnDisconnected()
	{
		qDebug() << "disconnected:";
		m_timer.stop();
	}

	void Received(const QMQTT::Message& message)
	{
		//开始回调
		m_call->Callback();
		qDebug() << "publish received: \"" << QString::fromUtf8(message.payload());
	}

};
#endif // !MQTTCLIENT_HPP



?代理收到了发布者的消息并显示在界面上。注意订阅主题需要按规范且保持一致。

?这里mqtt客户类的构造中,使用信号槽方式来接受和处理信号。

??? ?mqtt_test(QHostAddress ip, quint16 port = 1883)
?? ?{
?? ??? ?m_client = new Client();
?? ??? ?m_client->setHost(ip);
?? ??? ?m_client->setPort(port);
?? ??? ?//m_client->setHostName(EXAMPLE_NAME);
?? ??? ?m_client->connectToHost();
?? ??? ?connect(m_client, &Client::connected,this,
?? ??? ??? ?&mqtt_test::Onconnected);
// 一旦收到了链接成功的信号则会触发Onconnected()函数
?? ??? ?connect(&m_timer, &QTimer::timeout, this,
?? ??? ??? ?&mqtt_test::Published);
?? ??? ?
?? ??? ?connect(m_client, &Client::received, this,
?? ??? ??? ?&mqtt_test::Received);//
这里没收到回复,则不调动槽函数执行Received().
?? ??? ?connect(m_client, &Client::disconnected, this,
?? ??? ??? ?&mqtt_test::OnDisconnected);
?? ?}

?需要注意的是,这里并没有用到集成mqtt类去实现(不会),而是直接使用qmqtt的头文件加DLL库的形式调动qmqtt。

文章来源:https://blog.csdn.net/lmy347771232/article/details/135185343
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。