MFC连接mqtt服务器订阅和发送数据-自设计函数库

2023-12-31 09:44:01

以下是一个简单的MQTT连接库文件,其中包含了连接、断开、订阅主题、发送数据和接收数据等函数。请注意,这只是一个示例,你可能需要根据自己的实际需求进行修改。

#include <iostream>
#include <cstring>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>

const std::string SERVER_ADDRESS = "mqtt.server.com";
const int SERVER_PORT = 1883;

// MQTT固定报头结构
struct MqttFixedHeader {
    uint8_t controlPacketType;
    uint8_t remainingLength;
};

// MQTT连接报文结构
struct MqttConnectPacket {
    MqttFixedHeader fixedHeader;
    uint8_t variableHeader[10];
    uint8_t payload[20];
};

// MQTT订阅报文结构
struct MqttSubscribePacket {
    MqttFixedHeader fixedHeader;
    uint16_t packetIdentifier;
    uint8_t topic[20];
    uint8_t qos;
};

// MQTT发布报文结构
struct MqttPublishPacket {
    MqttFixedHeader fixedHeader;
    uint16_t topicLength;
    uint8_t topic[20];
    uint8_t payload[100];
};

class MqttClient {
public:
    MqttClient() : sockfd(-1), connected(false) {}

    ~MqttClient() {
        if (connected) {
            disconnect();
        }
    }

    bool connect(const std::string& clientId) {
        sockfd = socket(AF_INET, SOCK_STREAM, 0);
        if (sockfd < 0) {
            std::cerr << "Failed to create socket" << std::endl;
            return false;
        }

        struct sockaddr_in serv_addr;
        memset(&serv_addr, 0, sizeof(serv_addr));
        serv_addr.sin_family = AF_INET;
        serv_addr.sin_port = htons(SERVER_PORT);

        if (inet_pton(AF_INET, SERVER_ADDRESS.c_str(), &(serv_addr.sin_addr)) <= 0) {
            std::cerr << "Failed to set server address" << std::endl;
            return false;
        }

        if (connect(sockfd, reinterpret_cast<struct sockaddr*>(&serv_addr), sizeof(serv_addr)) < 0) {
            std::cerr << "Connection failed" << std::endl;
            return false;
        }

        // 构建MQTT连接报文
        MqttConnectPacket connectPacket;
        connectPacket.fixedHeader.controlPacketType = 0x10; // 连接请求
        connectPacket.fixedHeader.remainingLength = 0x0C; // 可变报头长度为12字节
        memcpy(connectPacket.variableHeader, "MQTT", 4);
        connectPacket.variableHeader[4] = 0x04; // MQTT协议版本号(4)
        connectPacket.variableHeader[5] = 0x02; // 连接标志
        connectPacket.variableHeader[6] = 0x00; // 保持连接时间的最高8位
        connectPacket.variableHeader[7] = 0x3C; // 保持连接时间的最低8位
        connectPacket.variableHeader[8] = 0x00; // 清理会话位为0
        connectPacket.variableHeader[9] = 0x00; // 预留位为0
        memcpy(connectPacket.payload, clientId.c_str(), clientId.length());

        // 发送MQTT连接报文
        if (send(sockfd, &connectPacket, sizeof(connectPacket), 0) < 0) {
            std::cerr << "Failed to send connect packet" << std::endl;
            return false;
        }

        // 接收MQTT服务器的响应
        uint8_t response[1024];
        ssize_t bytesRead = recv(sockfd, response, sizeof(response), 0);
        if (bytesRead <= 0) {
            std::cerr << "Failed to receive response" << std::endl;
            return false;
        }

        // 处理MQTT服务器的响应

        connected = true;

        return true;
    }

    void disconnect() {
        if (connected) {
            close(sockfd);
            sockfd = -1;
            connected = false;
        }
    }

    bool subscribe(const std::string& topic, uint8_t qos) {
        if (!connected) {
            std::cerr << "Not connected" << std::endl;
            return false;
        }

        // 构建MQTT订阅报文
        MqttSubscribePacket subscribePacket;
        subscribePacket.fixedHeader.controlPacketType = 0x82; // 订阅请求
        subscribePacket.fixedHeader.remainingLength = 0x0E; // 可变报头长度为14字节
        subscribePacket.packetIdentifier = 0x1234; // 包标识符
        memcpy(subscribePacket.topic, topic.c_str(), topic.length());
        subscribePacket.qos = qos; // QoS等级

        // 发送MQTT订阅报文
        if (send(sockfd, &subscribePacket, sizeof(subscribePacket), 0) < 0) {
            std::cerr << "Failed to send subscribe packet" << std::endl;
            return false;
        }

        // 接收MQTT服务器的响应
        uint8_t response[1024];
        ssize_t bytesRead = recv(sockfd, response, sizeof(response), 0);
        if (bytesRead <= 0) {
            std::cerr << "Failed to receive response" << std::endl;
            return false;
        }

        // 处理MQTT服务器的响应

        return true;
    }

    bool publish(const std::string& topic, const std::string& message, uint8_t qos) {
        if (!connected) {
            std::cerr << "Not connected" << std::endl;
            return false;
        }

        // 构建MQTT发布报文
        MqttPublishPacket publishPacket;
        publishPacket.fixedHeader.controlPacketType = 0x30; // 发布消息
        publishPacket.fixedHeader.remainingLength = 0x2D; // 可变报头长度为45字节
        publishPacket.topicLength = topic.length();
        memcpy(publishPacket.topic, topic.c_str(), topic.length());
        memcpy(publishPacket.payload, message.c_str(), message.length());

        // 发送MQTT发布报文
        if (send(sockfd, &publishPacket, sizeof(publishPacket), 0) < 0) {
            std::cerr << "Failed to send publish packet" << std::endl;
            return false;
        }

        // 处理MQTT服务器的响应

        return true;
    }

    ssize_t receive(uint8_t* buffer, size_t bufferSize) {
        if (!connected) {
            std::cerr << "Not connected" << std::endl;
            return -1;
        }

        return recv(sockfd, buffer, bufferSize, 0);
    }

private:
    int sockfd;
    bool connected;
};

在上述代码中,我们将MQTT连接功能封装到了一个名为MqttClient的类中,并提供了连接、断开、订阅主题、发送数据和接收数据等函数。你可以根据自己的实际需求调用这些函数。

例如,要连接到MQTT服务器,请使用以下代码:

MqttClient client;
if (client.connect("client_id")) {
    // 连接成功
} else {
    // 连接失败
}

要订阅主题,请使用以下代码:

if (client.subscribe("topic", 0x01)) {
    // 订阅成功
} else {
    // 订阅失败
}

要发布消息,请使用以下代码:

if (client.publish("topic", "message", 0x01)) {
    // 发布成功
} else {
    // 发布失败
}

要接收消息,请使用以下代码:

uint8_t buffer[1024];
ssize_t bytesRead = client.receive(buffer, sizeof(buffer));
if (bytesRead >= 0) {
    // 处理接收到的数据
} else {
    // 接收数据失败
}

下面是一个完整的MQTT连接示例,包括从用户输入地址和端口到订阅主题和发送消息的全部过程:

#include <iostream>
#include <string>

// 导入上述的MQTT连接库文件

int main() {
    std::string serverAddress;
    int serverPort;
    std::string clientId;
    std::string topic;

    // 获取用户输入的MQTT服务器地址和端口
    std::cout << "Enter MQTT server address: ";
    std::cin >> serverAddress;
    std::cout << "Enter MQTT server port: ";
    std::cin >> serverPort;
    std::cout << "Enter client ID: ";
    std::cin >> clientId;

    MqttClient client;

    // 连接到MQTT服务器
    if (client.connect(clientId)) {
        std::cout << "Connected to MQTT server" << std::endl;

        // 订阅主题
        std::cout << "Enter topic to subscribe: ";
        std::cin >> topic;
        if (client.subscribe(topic, 0x01)) {
            std::cout << "Subscribed to topic: " << topic << std::endl;
        } else {
            std::cerr << "Failed to subscribe to topic" << std::endl;
            return -1;
        }

        // 发布消息
        std::string message;
        std::cout << "Enter message to publish: ";
        std::cin.ignore(); // 忽略之前的换行符
        std::getline(std::cin, message);
        if (client.publish(topic, message, 0x01)) {
            std::cout << "Published message: " << message << std::endl;
        } else {
            std::cerr << "Failed to publish message" << std::endl;
            return -1;
        }

        // 接收消息
        uint8_t buffer[1024];
        ssize_t bytesRead = client.receive(buffer, sizeof(buffer));
        if (bytesRead >= 0) {
            std::string receivedMessage(reinterpret_cast<char*>(buffer), bytesRead);
            std::cout << "Received message: " << receivedMessage << std::endl;
        } else {
            std::cerr << "Failed to receive message" << std::endl;
            return -1;
        }

        client.disconnect();
        std::cout << "Disconnected from MQTT server" << std::endl;
    } else {
        std::cerr << "Failed to connect to MQTT server" << std::endl;
        return -1;
    }

    return 0;
}

在这个示例中,我们使用std::cin从用户那里获取了MQTT服务器的地址、端口、客户端ID以及要订阅和发布的主题和消息。然后,我们通过调用相应的函数来进行连接、订阅、发布和接收。

文章来源:https://blog.csdn.net/m0_52513940/article/details/135313112
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。