mosquitto发布端和订阅端代码范例

2024-01-08 09:30:44

我也是复制的,没有测试。应该能正常工作中。

发布端

/*********************************************************************************
?* ? ? ?Copyright: ?(C) 2022 Ye Xingwei<2929273315@qq.com>
?* ? ? ? ? ? ? ? ? ?All rights reserved.
?*
?* ? ? ? Filename: ?subscribe.c
?* ? ?Description: ?This file MQTT_pub
?* ? ? ? ? ? ? ? ??
?* ? ? ? ?Version: ?1.0.0(2022年01月04日)
?* ? ? ? ? Author: ?Ye Xingwei <2929273315@qq.com>
?* ? ? ?ChangeLog: ?1, Release initial version on "2022年01月04日 15时08分27秒"
?* ? ? ? ? ? ? ? ??
?********************************************************************************/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <mosquitto.h>
#include <time.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <dirent.h>
#include <signal.h>
#include <ctype.h>

#include "cJSON.h"


#define HOST ? ? ? ? ? "localhost"
#define PORT ? ? ? ? ? ?1883
#define KEEP_ALIVE ? ? ?60
#define MSG_MAX_SIZE ? ?512


static int ?g_stop = 0;


void mqtt_connect_callback(struct mosquitto *mosq, void *obj, int rc);
void mqtt_disconnect_callback(struct mosquitto *mosq, void *obj, int rc);
int get_time(char *datetime, int bytes);
int get_temperature(float *temp);
int get_ipaddr(char *interface,char *ipaddr,int ipaddr_size);
void sig_handle(int signum);

int main (int argc, char **argv)
{
? ? int ? ? ? ? ? ? ? ? rv;
? ? struct mosquitto ? ?*mosq = NULL;

? ? /*安装信号*/
? ? signal(SIGUSR1,sig_handle);
? ??
? ??
? ??
? ??
? ? /* MQTT 初始化 */
? ? rv = mosquitto_lib_init();
? ? if(rv != MOSQ_ERR_SUCCESS)
? ? {
? ? ? ? printf("mosquitto lib int failure:%s\n", strerror(errno));
? ? ? ? goto cleanup;
? ? }
? ??
? ? /* 创建新的客户端 */
? ? mosq = mosquitto_new(NULL,true,NULL);
? ? if(!mosq)
? ? {
? ? ? ? printf("create client failure:%s\n",strerror(errno));
? ? ? ? goto cleanup;
? ? }
? ??
? ? /* 回调函数 */
? ? mosquitto_connect_callback_set(mosq, mqtt_connect_callback);
? ??
? ??
? ??
? ? while(!g_stop)
? ? {
? ? ? ? /* ?连接MQTT服务器,ip,端口,时间 */?
? ? ? ? if(mosquitto_connect(mosq,HOST,PORT,KEEP_ALIVE) != MOSQ_ERR_SUCCESS)
? ? ? ? {
? ? ? ? ? ? printf("mosquitto_connect() failed: %s\n",strerror(errno));
? ? ? ? ? ? goto cleanup;
? ? ? ? }
? ? ? ? printf("connect successfully\n");

? ? ? ? /* 无阻塞 断线连接 */
? ? ? ? mosquitto_loop_forever(mosq,-1,1);


? ? ? ? sleep(10);

? ? }

cleanup:?
? ? mosquitto_destroy(mosq);
? ? mosquitto_lib_cleanup();
? ? return 0;
}?

/*确认连接回函数*/
void mqtt_connect_callback(struct mosquitto *mosq, void *obj, int rc)
{
? ? char ? ? ? ? ? ? ? ? ? ?ipaddr[16];
? ? char ? ? ? ? ? ? ? ? ? ?*interface="eth0";
? ? char ? ? ? ? ? ? ? ? ? ?datetime[64];
? ? cJSON ? ? ? ? ? ? ? ? ? *root;
? ? cJSON ? ? ? ? ? ? ? ? ? *item;
? ? char ? ? ? ? ? ? ? ? ? ?*msg;
? ? struct mqtt_user_data ? *mqtt;


? ? printf("Connection successful cJSON call packaging\n");

? ? float temper = 0.000000;

? ? if(get_temperature(&temper) < 0)
? ? {
? ? ? ? printf("get_temperature failed.\n");
? ? ? ? return;
? ? }

? ? if(get_time(datetime,sizeof(datetime))<0)
? ? {
? ? ? ? printf("get_time failure\n");
? ? ? ? return ;
? ? }

? ? memset(ipaddr,0,sizeof(ipaddr));
? ? if(get_ipaddr(interface,ipaddr,sizeof(ipaddr))<0)
? ? {
? ? ? ? printf("ERROR:get ip address failure\n");
? ? ? ? return ;
? ? }


? ? root = cJSON_CreateObject();
? ? item = cJSON_CreateObject();

? ? /* cJSON打包 */
? ? cJSON_AddItemToObject(root,"id",cJSON_CreateString(ipaddr));
? ? cJSON_AddItemToObject(root,"time",cJSON_CreateString(datetime));
? ? cJSON_AddItemToObject(root,"Temperature",cJSON_CreateNumber(temper));
? ?
? ? msg = cJSON_Print(root);
? ? //printf("%s\n",msg);


? ? if(!rc)
? ? {
? ? ? ? if(mosquitto_publish(mosq,NULL,"temp",strlen(msg),msg,0,NULL) != MOSQ_ERR_SUCCESS)
? ? ? ? {
? ? ? ? ? ? printf("mosquitto_publish failed: %s\n",strerror(errno));
? ? ? ? ? ? return;
? ? ? ? }
? ? }

? ? mosquitto_disconnect(mosq);
}

/* 获取时间 */
int get_time(char *datetime, int bytes)
{
? ? time_t ? ? ? ? ? ? ?now;
? ? struct tm ? ? ? ? ?*t;

? ? time(&now);
? ? t = localtime(&now);

? ? snprintf(datetime, bytes, "%04d-%02d-%02d %02d:%02d:%02d", t->tm_year + 1900, t->tm_mon + 1, t->tm_mday, (t->tm_hour)+8, t->tm_min, t->tm_sec);

? ? return 0;
}
/* 安装信号 */
void sig_handle(int signum)
{
? ? if(SIGUSR1 == signum)
? ? {
? ? ? ? g_stop = 1;
? ? }
}

/* 获取温度 */
int get_temperature(float *temp)
{
? ? int ? ? fd = -1;
? ? char ? ?buf[128];
? ? char ? ?*ptr=NULL;
? ? DIR ? ? *dirp = NULL;
? ? struct dirent *direntp = NULL;
? ? char ? ?w1_path[64]="/sys/bus/w1/devices/";
? ? char ? ?chip_sn[32];
? ? int ? ? found = 0;


? ? dirp=opendir(w1_path);
? ? if(!dirp)
? ? {
? ? ? ? printf("open foldir %s failure:%s\n",w1_path,strerror(errno));
? ? ? ? return -1;
? ? }

? ? while(NULL!=(direntp=readdir(dirp)))
? ? {
? ? ? ? if(strstr(direntp->d_name,"28-"))
? ? ? ? {
? ? ? ? ? ? strncpy(chip_sn, direntp->d_name,sizeof(chip_sn));
? ? ? ? ? ? found = -1;
? ? ? ? }
? ? }
? ? closedir(dirp);
? ? if(!found)
? ? {
? ? ? ? printf("can not find ds18b20 chipset\n");
? ? ? ? return ?-2;
? ? }


? ? strncat(w1_path,chip_sn,sizeof(w1_path)-strlen(w1_path));
? ? strncat(w1_path,"/w1_slave",sizeof(w1_path)-strlen(w1_path));

? ? if((fd = open(w1_path,O_RDONLY))<0)
? ? {
? ? ? ? printf("File opened successfully:%s\n",strerror(errno));
? ? ? ? return -3;
? ? }

? ? memset(buf, 0, sizeof(buf));
? ? if(read(fd, buf, sizeof(buf))<0)
? ? {
? ? ? ? printf("read data from fd=%d failure:%s\n",fd,strerror(errno));
? ? ? ? return -4;
? ? }

? ? ptr = strstr(buf,"t=");
? ? if(!ptr)
? ? {
? ? ? ? printf("t=string\n");
? ? ? ? return -5;

? ? }

? ? ptr+= 2;
? ? *temp = atof(ptr)/1000;
? ? close(fd);
? ? return 0;

}


/* 获取IP地址 */
int get_ipaddr(char *interface,char *ipaddr,int ipaddr_size)
{
? ? char ? ? ? ? ? ?buf[1024];
? ? char ? ? ? ? ? ?*ptr;
? ? char ? ? ? ? ? ?*ip_start;
? ? char ? ? ? ? ? ?*ip_end;
? ? FILE ? ? ? ? ? ?*fp;
? ? int ? ? ? ? ? ? len;
? ? int ? ? ? ? ? ? rv;

? ? if(!interface || !ipaddr || ipaddr_size <16)
? ? {
? ? ? ? printf("Invalid input argument\n");
? ? ? ? return -2;
? ? }

? ? memset(buf, 0 , sizeof(buf));

? ? snprintf(buf,sizeof(buf),"ifconfig %s",interface);
? ? fp = popen(buf,"r");
? ? if(NULL==fp)
? ? {
? ? ? ? printf("popen() to extern command\"%s\"failure:%s\n",buf,strerror(errno));
? ? ? ? return -2;
? ? }
? ? rv = -3;
? ? while(fgets(buf,sizeof(buf),fp))
? ? {
? ? ? ? if(strstr(buf,"netmask"))
? ? ? ? {
? ? ? ? ? ? ptr = strstr(buf,"inet");
? ? ? ? ? ? if(!ptr)
? ? ? ? ? ? {
? ? ? ? ? ? ? ? break;
? ? ? ? ? ? }
? ? ? ? ? ? ptr +=strlen("inet");

? ? ? ? ? ? while(isblank(*ptr))
? ? ? ? ? ? ? ? ptr++;
? ? ? ? ? ? ip_start = ptr;
? ? ? ? ? ? while(!isblank(*ptr))
? ? ? ? ? ? ? ? ptr++;
? ? ? ? ? ? ip_end = ptr;
? ? ? ? ? ? memset(ipaddr,0,sizeof(ipaddr));

? ? ? ? ? ? len = ip_end-ip_start;
? ? ? ? ? ? len = len>ipaddr_size ? ipaddr_size:len;

? ? ? ? ? ? memcpy(ipaddr,ip_start,len);
? ? ? ? ? ? rv = 0;

? ? ? ? ? ? break;
? ? ? ? }
? ? }
? ? pclose(fp);
? ? return rv;
}

订阅端

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <mosquitto.h>

#include "cJSON.h"


#define HOST "localhost"
#define PORT  1883
#define KEEP_ALIVE 60
#define MSG_MAX_SIZE  512

static int running = 1;


/* 确认连接回调函数 */
void mqtt_connect_callback(struct mosquitto *mosq, void *obj, int rc)
{
    printf("Confirm the connection to the client\n");
    if(rc)
    {
        printf("on_connect error!\n");
        exit(1);
    }
    else
    {
        if(mosquitto_subscribe(mosq, NULL, "temp", 2))
        {
            printf("Set the topic error!\n");
            exit(1);
        }
    }
}


/*获取到订阅的内容*/
void mqtt_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *msg)
{

        printf("Obtaining content successfully\n");
        printf("\n");
        printf("Succeeded in obtaining the time and temperature:%s\n", (char *)msg->payload);
}



int main (int argc, char **argv)
{
    int                 ret;
    struct mosquitto    *mosq;

    /* MQTT 初始化 */
    ret = mosquitto_lib_init();
    if(ret)
    {
        printf("Init lib error!\n");
        goto cleanup;
        return -1;
    }

    /* 创建新的客户端 */
    mosq = mosquitto_new(NULL,true, NULL);
    if(mosq == NULL)
    {
        printf("Create a new client failure\n");
        goto cleanup;
        return -1;
    }
    /* 回调函数 */
    mosquitto_connect_callback_set(mosq, mqtt_connect_callback);
    mosquitto_message_callback_set(mosq, mqtt_message_callback);

    /* 连接代理 */
    ret = mosquitto_connect(mosq, HOST, PORT, KEEP_ALIVE);
    if(ret)
    {

        printf("Connect server error!\n");
        goto cleanup;
        return -1;
    }
    printf("connection client is OK\n");

    while(running)
    {
        mosquitto_loop(mosq, -1, 1);
    }
    

/* 释放 清空 */
cleanup:
    mosquitto_destroy(mosq);
    mosquitto_lib_cleanup();
    return 0;
} 

文章来源:https://blog.csdn.net/quantum7/article/details/135123153
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。