c语言多线程队列实现

2023-12-14 17:48:41

为了用c语言实现队列进行多线程通信,用于实现一个状态机。
下面是实现过程

1.实现多线程队列入栈和出栈,不加锁

发送线程发送字符1,接收线程接收字符并打印。
多线程没有加锁,会有危险

#include "stdio.h"
#include <thread>
#include <unistd.h>
#include <pthread.h>


typedef struct MutiThreadCharQueNode
{
    unsigned char data;
    struct MutiThreadCharQueNode* next;
}MutiThreadCharQueNode;

typedef struct MutiThreadCharQueue
{
    MutiThreadCharQueNode* phead;
    MutiThreadCharQueNode* ptail;
    int size;
}MutiThreadCharQueue;
MutiThreadCharQueue TestMutiThreadQue;

void MutiThreadCharQueueInit(MutiThreadCharQueue* pq)
{
    pq->phead=NULL; //将队列的头指针置为空
    pq->ptail = NULL;//将队列的尾指针置为空
    pq->size = 0;// 将队列的头指针置为空
}
bool MutiThreadCharQueueEmpty(MutiThreadCharQueue* pq)
{
    return pq->size == 0;
}
void MutiThreadCharQueueDestroy(MutiThreadCharQueue* pq)
{
    MutiThreadCharQueNode* cur = pq->phead;// 创建一个指针 cur,指向队列的头指针
    while (cur)
    {
        MutiThreadCharQueNode* next = cur->next;// 创建一个指针 cur,指向队列的头指针
        free(cur);// 释放当前节点的内存
        cur = next;// 将指针 cur 移动到下一个节点
    }
    pq->phead = pq->ptail = NULL;// 将队列的头指针和尾指针置为空
    pq->size = 0;// 将队列的大小置为0
}
void MutiThreadCharQueuePush(MutiThreadCharQueue* pq, unsigned char x)
{

    MutiThreadCharQueNode* newnode = (MutiThreadCharQueNode*)malloc(sizeof(MutiThreadCharQueNode));// 创建一个新的节点
    if (newnode == NULL)
    {

        return;
    }
    newnode->data = x;// 设置新节点的数据为传入的元素值
    newnode->next = NULL;// 将新节点的指针域置空

    //一个节点

    if (pq->ptail == NULL)// 判断队列是否为空
    {
        pq->phead = pq->ptail = newnode;// 将新节点同时设置为队列的头节点和尾节点
    }
    //多个节点
    else
    {
        pq->ptail->next = newnode;// 将新节点同时设置为队列的头节点和尾节点
        pq->ptail = newnode;// 更新队列的尾指针为新节点
    }
    pq->size++;// 增加队列的大小计数
}
unsigned char MutiThreadCharQueueFront(MutiThreadCharQueue* pq)
{
//	assert(pq);// 检查指针是否为空
//	assert(!QueueEmpty(pq));// 检查队列是否非空
//	assert(pq->phead);// 检查队列的头指针是否存在
//    if(QueueEmpty(pq))
//    {
//        return ;
//    }
    return pq->phead->data;// 返回队列头节点的数据
}
void MutiThreadCharQueuePop(MutiThreadCharQueue* pq)
{

    //1.一个节点
    if (pq->phead->next == NULL) // 队列只有一个节点的情况
    {
        free(pq->phead); // 释放队列头节点的内存
        pq->phead = pq->ptail = NULL;// 将队列的头指针和尾指针置为空
    }
    //2.多个节点
    else
    {
        MutiThreadCharQueNode* next = pq->phead->next; //保存队列头节点的下一个节点指针
        free(pq->phead);// 释放队列头节点的内存
        pq->phead = next;// 更新队列的头指针为下一个节点
    }
    pq->size--;//减少队列的大小计数
}
void* thread_send(void* para)
{
    printf("hh\n");
    MutiThreadCharQueueInit(&TestMutiThreadQue);
    unsigned char sendChar=1;
    while(1)
    {
        printf("send a\n");
        MutiThreadCharQueuePush(&TestMutiThreadQue,sendChar);
        usleep(1000000);
    }
}
void* thread_rev(void* para)
{
    printf("h2\n");

    unsigned char revChar;
    while(1)
    {

        if(false==MutiThreadCharQueueEmpty(&TestMutiThreadQue))
        {
            revChar=MutiThreadCharQueueFront(&TestMutiThreadQue);

            printf("rev char= %d\n",(int)revChar);
            MutiThreadCharQueuePop(&TestMutiThreadQue);
        }
        usleep(1000000);
    }
}

void create_c_thread_send()
{
    int ret;
    pthread_attr_t attr;
    ret = pthread_attr_init(&attr);
     if (ret != 0) {
        return ;
     }

     //2
     pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);

    int err;
    pthread_t tid;
    err = pthread_create(&tid, &attr, thread_send, (void*)NULL);

}
void create_c_thread_rev()
{
    int ret;
    pthread_attr_t attr;
    ret = pthread_attr_init(&attr);
     if (ret != 0) {
        return ;
     }

     //2
     pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);

    int err;
    pthread_t tid;
    err = pthread_create(&tid, &attr, thread_rev, (void*)NULL);

}
int main(int argc, char** argv)
{

  printf("hello\n");

  create_c_thread_send();
  create_c_thread_rev();
  while(1)
  {
      usleep(10000);
  }
  return 0;
}

2.实现多线程队列入栈和出栈,加锁并使用信号量触发接收线程

在队列的结构体中加上锁,防止多线程冲突

#include "stdio.h"
#include <thread>
#include <unistd.h>
#include <pthread.h>


typedef struct MutiThreadCharQueNode
{
    unsigned char data;
    struct MutiThreadCharQueNode* next;
}MutiThreadCharQueNode;

typedef struct MutiThreadCharQueue
{
    MutiThreadCharQueNode* phead;
    MutiThreadCharQueNode* ptail;
    int size;
    pthread_mutex_t mutex;
}MutiThreadCharQueue;
MutiThreadCharQueue TestMutiThreadQue;

void MutiThreadCharQueueInit(MutiThreadCharQueue* pq)
{
    pq->phead=NULL; //将队列的头指针置为空
    pq->ptail = NULL;//将队列的尾指针置为空
    pq->size = 0;// 将队列的头指针置为空
    pthread_mutex_init(&pq->mutex, NULL);
}
bool MutiThreadCharQueueEmpty(MutiThreadCharQueue* pq)
{
    pthread_mutex_lock(&pq->mutex);
    bool bEmpty=(bool) (pq->size == 0);
    pthread_mutex_unlock(&pq->mutex);
    return bEmpty;
}
void MutiThreadCharQueueDestroy(MutiThreadCharQueue* pq)
{
    pthread_mutex_lock(&pq->mutex);
    MutiThreadCharQueNode* cur = pq->phead;// 创建一个指针 cur,指向队列的头指针
    while (cur)
    {
        MutiThreadCharQueNode* next = cur->next;// 创建一个指针 cur,指向队列的头指针
        free(cur);// 释放当前节点的内存
        cur = next;// 将指针 cur 移动到下一个节点
    }
    pq->phead = pq->ptail = NULL;// 将队列的头指针和尾指针置为空
    pq->size = 0;// 将队列的大小置为0
    pthread_mutex_unlock(&pq->mutex);
}
void MutiThreadCharQueuePush(MutiThreadCharQueue* pq, unsigned char x)
{
    pthread_mutex_lock(&pq->mutex);
    MutiThreadCharQueNode* newnode = (MutiThreadCharQueNode*)malloc(sizeof(MutiThreadCharQueNode));// 创建一个新的节点
    if (newnode == NULL)
    {
        pthread_mutex_unlock(&pq->mutex);
        return;
    }
    newnode->data = x;// 设置新节点的数据为传入的元素值
    newnode->next = NULL;// 将新节点的指针域置空

    //一个节点

    if (pq->ptail == NULL)// 判断队列是否为空
    {
        pq->phead = pq->ptail = newnode;// 将新节点同时设置为队列的头节点和尾节点
    }
    //多个节点
    else
    {
        pq->ptail->next = newnode;// 将新节点同时设置为队列的头节点和尾节点
        pq->ptail = newnode;// 更新队列的尾指针为新节点
    }
    pq->size++;// 增加队列的大小计数
    pthread_mutex_unlock(&pq->mutex);
}
unsigned char MutiThreadCharQueueFront(MutiThreadCharQueue* pq)
{
//    if(QueueEmpty(pq))
//    {
//        return ;
//    }
    pthread_mutex_lock(&pq->mutex);
    char data=pq->phead->data;// 返回队列头节点的数据
    pthread_mutex_unlock(&pq->mutex);
    return data;
}
void MutiThreadCharQueuePop(MutiThreadCharQueue* pq)
{
    pthread_mutex_lock(&pq->mutex);
    //1.一个节点
    if (pq->phead->next == NULL) // 队列只有一个节点的情况
    {
        free(pq->phead); // 释放队列头节点的内存
        pq->phead = pq->ptail = NULL;// 将队列的头指针和尾指针置为空
    }
    //2.多个节点
    else
    {
        MutiThreadCharQueNode* next = pq->phead->next; //保存队列头节点的下一个节点指针
        free(pq->phead);// 释放队列头节点的内存
        pq->phead = next;// 更新队列的头指针为下一个节点
    }
    pq->size--;//减少队列的大小计数
    pthread_mutex_unlock(&pq->mutex);
}
void* thread_send(void* para)
{
    printf("hh\n");
    MutiThreadCharQueueInit(&TestMutiThreadQue);
    unsigned char sendChar=1;
    while(1)
    {
        printf("send a\n");
        MutiThreadCharQueuePush(&TestMutiThreadQue,sendChar);
        usleep(1000000);
    }
}
void* thread_rev(void* para)
{
    printf("h2\n");

    unsigned char revChar;
    while(1)
    {

        if(false==MutiThreadCharQueueEmpty(&TestMutiThreadQue))
        {
            revChar=MutiThreadCharQueueFront(&TestMutiThreadQue);

            printf("rev char= %d\n",(int)revChar);
            MutiThreadCharQueuePop(&TestMutiThreadQue);
        }
        usleep(1000000);
    }
}

void create_c_thread_send()
{
    int ret;
    pthread_attr_t attr;
    ret = pthread_attr_init(&attr);
     if (ret != 0) {
        return ;
     }

     //2
     pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);

    int err;
    pthread_t tid;
    err = pthread_create(&tid, &attr, thread_send, (void*)NULL);

}
void create_c_thread_rev()
{
    int ret;
    pthread_attr_t attr;
    ret = pthread_attr_init(&attr);
     if (ret != 0) {
        return ;
     }

     //2
     pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);

    int err;
    pthread_t tid;
    err = pthread_create(&tid, &attr, thread_rev, (void*)NULL);

}
int main(int argc, char** argv)
{

  printf("hello\n");

  create_c_thread_send();
  create_c_thread_rev();
  while(1)
  {
      usleep(10000);
  }
  return 0;
}

3.实现任意数据类型的多线程队列

以上的队列数据类型固定了,希望实现一个通用的多线程队列,并且数据可以得到释放。

#include "stdio.h"
#include <thread>
#include <unistd.h>
#include <pthread.h>
#include <string.h>

typedef struct MutiThreadQueNode
{
    void* data;

    struct MutiThreadQueNode* next;
}MutiThreadQueNode;

typedef struct MutiThreadQueue
{
    MutiThreadQueNode* phead;
    MutiThreadQueNode* ptail;
    int size;
    int data_mem_size;
    pthread_mutex_t mutex;
}MutiThreadQueue;

typedef struct TestMyStructData
{
    int my_int_data;
    float my_float_data;
}TestMyStructData;

MutiThreadQueue TestMutiThreadQue;

void MutiThreadQueueInit(MutiThreadQueue* pq,int data_mem_size)
{
    pq->phead=NULL; //将队列的头指针置为空
    pq->ptail = NULL;//将队列的尾指针置为空
    pq->size = 0;// 将队列的头指针置为空
    pq->data_mem_size=data_mem_size;
    pthread_mutex_init(&pq->mutex, NULL);
}
bool MutiThreadQueueEmpty(MutiThreadQueue* pq)
{
    pthread_mutex_lock(&pq->mutex);
    bool bEmpty=(bool) (pq->size == 0);
    pthread_mutex_unlock(&pq->mutex);
    return bEmpty;
}
void MutiThreadQueueDestroy(MutiThreadQueue* pq)
{
    pthread_mutex_lock(&pq->mutex);
    MutiThreadQueNode* cur = pq->phead;// 创建一个指针 cur,指向队列的头指针
    while (cur)
    {
        MutiThreadQueNode* next = cur->next;// 创建一个指针 cur,指向队列的头指针
        //!由于data是拷贝过来的,释放data内存
        free(cur->data);
        free(cur);// 释放当前节点的内存
        cur = next;// 将指针 cur 移动到下一个节点
    }
    pq->phead = pq->ptail = NULL;// 将队列的头指针和尾指针置为空
    pq->size = 0;// 将队列的大小置为0
    pthread_mutex_unlock(&pq->mutex);
}
void MutiThreadQueuePush(MutiThreadQueue* pq, void* data,int data_mem_size)
{
    pthread_mutex_lock(&pq->mutex);
    MutiThreadQueNode* newnode = (MutiThreadQueNode*)malloc(sizeof(MutiThreadQueNode));// 创建一个新的节点
    if (newnode == NULL)
    {
        pthread_mutex_unlock(&pq->mutex);
        return;
    }
    if(pq->data_mem_size!=data_mem_size)
    {
        printf("input data error\n");
        pthread_mutex_unlock(&pq->mutex);
        return;
    }
    void* queData=malloc(pq->data_mem_size);
    memcpy(queData,data,pq->data_mem_size);
    newnode->data = queData;// 设置新节点的数据为传入的元素值
    newnode->next = NULL;// 将新节点的指针域置空

    //一个节点

    if (pq->ptail == NULL)// 判断队列是否为空
    {
        pq->phead = pq->ptail = newnode;// 将新节点同时设置为队列的头节点和尾节点
    }
    //多个节点
    else
    {
        pq->ptail->next = newnode;// 将新节点同时设置为队列的头节点和尾节点
        pq->ptail = newnode;// 更新队列的尾指针为新节点
    }
    pq->size++;// 增加队列的大小计数
    pthread_mutex_unlock(&pq->mutex);
}
void MutiThreadQueueFront(MutiThreadQueue* pq,void* outData,int data_mem_size)
{
//    if(QueueEmpty(pq))
//    {
//        return ;
//    }
    pthread_mutex_lock(&pq->mutex);
    if(data_mem_size!=pq->data_mem_size)
    {
        printf("input data_mem_size error\n");
        pthread_mutex_unlock(&pq->mutex);
        return ;
    }
    memcpy(outData,pq->phead->data,pq->data_mem_size);
    pthread_mutex_unlock(&pq->mutex);

}
void MutiThreadQueuePop(MutiThreadQueue* pq)
{
    pthread_mutex_lock(&pq->mutex);
    //1.一个节点
    if (pq->phead->next == NULL) // 队列只有一个节点的情况
    {
        free(pq->phead); // 释放队列头节点的内存
        pq->phead = pq->ptail = NULL;// 将队列的头指针和尾指针置为空
    }
    //2.多个节点
    else
    {
        MutiThreadQueNode* next = pq->phead->next; //保存队列头节点的下一个节点指针
        //!由于data是拷贝过来的,释放data内存
        free(pq->phead->data);
        free(pq->phead);// 释放队列头节点的内存
        pq->phead = next;// 更新队列的头指针为下一个节点
    }
    pq->size--;//减少队列的大小计数
    pthread_mutex_unlock(&pq->mutex);
}
void* thread_send(void* para)
{
    printf("hh\n");
    TestMyStructData mySendData;
    mySendData.my_int_data=1;
    mySendData.my_float_data=2;
    MutiThreadQueueInit(&TestMutiThreadQue,sizeof(TestMyStructData));

    while(1)
    {
        printf("send 1\n");
        MutiThreadQueuePush(&TestMutiThreadQue,&mySendData,sizeof(TestMyStructData));
        usleep(1000000);
    }
}
void* thread_rev(void* para)
{
    printf("h2\n");
    TestMyStructData myRevData;

    while(1)
    {

        if(false==MutiThreadQueueEmpty(&TestMutiThreadQue))
        {
            MutiThreadQueueFront(&TestMutiThreadQue,&myRevData,sizeof(TestMyStructData));

            printf("rev intdata= %d float data=%f\n",myRevData.my_int_data,myRevData.my_float_data);
            MutiThreadQueuePop(&TestMutiThreadQue);
        }
        usleep(1000000);
    }
}

void create_c_thread_send()
{
    int ret;
    pthread_attr_t attr;
    ret = pthread_attr_init(&attr);
     if (ret != 0) {
        return ;
     }

     //2
     pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);

    int err;
    pthread_t tid;
    err = pthread_create(&tid, &attr, thread_send, (void*)NULL);

}
void create_c_thread_rev()
{
    int ret;
    pthread_attr_t attr;
    ret = pthread_attr_init(&attr);
     if (ret != 0) {
        return ;
     }

     //2
     pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);

    int err;
    pthread_t tid;
    err = pthread_create(&tid, &attr, thread_rev, (void*)NULL);

}
int main(int argc, char** argv)
{

  printf("hello\n");

  create_c_thread_send();
  create_c_thread_rev();
  while(1)
  {
      usleep(10000);
  }
  return 0;
}

4.队列其他操作

队列操作可以完善的点
1.加上队列最大限制,如果队列内数据大小超过阈值,清空队列

文章来源:https://blog.csdn.net/ktigerhero3/article/details/134988580
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。