C++队列线程安全

2023-12-17 11:45:05

1.函数

#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>

// 数据包结构体
struct Packet {
    int data;
    // 其他数据字段...
};

// 线程安全的队列
template <typename T>
class ThreadSafeQueue {
private:
    std::queue<T> queue_;
    mutable std::mutex mutex_;
    std::condition_variable condition_;

public:
    void push(const T& value) {
        std::unique_lock<std::mutex> lock(mutex_);
        queue_.push(value);
        lock.unlock();
        condition_.notify_one();
    }

    T pop() {
        std::unique_lock<std::mutex> lock(mutex_);
        condition_.wait(lock, [this] { return !queue_.empty(); });
        T value = queue_.front();
        queue_.pop();
        return value;
    }

    bool empty() const {
        std::unique_lock<std::mutex> lock(mutex_);
        return queue_.empty();
    }
};

bool isRunning = true; // 退出标志

// 生产者线程
void producerThread(ThreadSafeQueue<Packet>& packetQueue) {
    for (int i = 0; i < 10; ++i) {
        Packet packet;
        packet.data = i;
        packetQueue.push(packet);
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
    isRunning = false; // 生产者线程完成任务后设置退出标志
}

// 解析线程
void parserThread(ThreadSafeQueue<Packet>& packetQueue, ThreadSafeQueue<Packet>& parsedQueue) {
    while (isRunning || !packetQueue.empty()) {
        Packet packet = packetQueue.pop();
        // 解析数据的逻辑...
        // 这里简单地将数据放入parsedQueue中
        parsedQueue.push(packet);
    }
}

// 处理线程
void processerThread(ThreadSafeQueue<Packet>& parsedQueue, ThreadSafeQueue<Packet>& processedQueue) {
    while (isRunning || !parsedQueue.empty()) {
        Packet packet = parsedQueue.pop();
        // 处理数据的逻辑...
        // 这里简单地将数据放入processedQueue中
        printf("processedQueue.push(%d)\n", (packet.data));
        processedQueue.push(packet);
    }
}

int main() {
    ThreadSafeQueue<Packet> packetQueue;
    ThreadSafeQueue<Packet> parsedQueue;
    ThreadSafeQueue<Packet> processedQueue;

    std::thread producer(producerThread, std::ref(packetQueue));
    std::thread parser(parserThread, std::ref(packetQueue), std::ref(parsedQueue));
    std::thread processer(processerThread, std::ref(parsedQueue), std::ref(processedQueue));

    producer.join();
    parser.join();
    processer.join();

    return 0;
}

#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>

// 数据包结构体
struct Packet {
    int data;
    // 其他数据字段...
};

// 线程安全的队列
template <typename T>
class ThreadSafeQueue {
private:
    std::queue<T> queue_;
    mutable std::mutex mutex_;
    std::condition_variable condition_;
    bool isRunning_;

public:
    ThreadSafeQueue() : isRunning_(true) {}

    void push(const T& value) {
        std::unique_lock<std::mutex> lock(mutex_);
        queue_.push(value);
        lock.unlock();
        condition_.notify_one();
    }

    T pop() {
        std::unique_lock<std::mutex> lock(mutex_);
        condition_.wait(lock, [this] { return !queue_.empty() || !isRunning_; });

        if (!queue_.empty()) {
            T value = queue_.front();
            queue_.pop();
            return value;
        }

        return {};  // 空值表示队列已经关闭
    }

    bool empty() const {
        std::unique_lock<std::mutex> lock(mutex_);
        return queue_.empty();
    }

    bool isRunning() const {
        return isRunning_;
    }

    void stop() {
        isRunning_ = false;
        condition_.notify_all();
    }

    std::mutex& getMutex() {
        return mutex_;
    }
};

// 生产者类
class Producer {
public:
    void operator()(ThreadSafeQueue<Packet>& packetQueue) {
        for (int i = 0; i < 10; ++i) {
            Packet packet;
            packet.data = i;
            printf("packetQueue.push(%d)\n", (packet.data));

            packetQueue.push(packet);
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
        }

        packetQueue.stop();  // 告知队列停止接受数据
    }
};

// 解析类
class Parser {
public:
    void operator()(ThreadSafeQueue<Packet>& packetQueue, ThreadSafeQueue<Packet>& parsedQueue) {
        while (true) {
            Packet packet = packetQueue.pop();
            if (!packetQueue.isRunning() && packetQueue.empty()) {
                // 生产者已经结束并且队列为空,可以退出循环
                break;
            }

            printf("parsedQueue.push(%d)\n", (packet.data));

            // 解析数据的逻辑...
            // 这里简单地将数据放入parsedQueue中
            parsedQueue.push(packet);
        }
    }
};

// 处理类
class Processer {
public:
    void operator()(ThreadSafeQueue<Packet>& parsedQueue, ThreadSafeQueue<Packet>& processedQueue) {
        while (true) {
            Packet packet = parsedQueue.pop();
            if (!parsedQueue.isRunning() && parsedQueue.empty()) {
                // 解析者已经结束并且队列为空,可以退出循环
                break;
            }

            // 处理数据的逻辑...
            // 这里简单地将数据放入processedQueue中
            printf("processedQueue.push(%d)\n", (packet.data));
            processedQueue.push(packet);
        }
    }
};


int main() {
    ThreadSafeQueue<Packet> packetQueue;
    ThreadSafeQueue<Packet> parsedQueue;
    ThreadSafeQueue<Packet> processedQueue;

    Producer producer;
    Parser parser;
    Processer processer;

    std::thread producerThread(producer, std::ref(packetQueue));
    std::thread parserThread(parser, std::ref(packetQueue), std::ref(parsedQueue));
    std::thread processerThread(processer, std::ref(parsedQueue), std::ref(processedQueue));

    producerThread.join();
    parserThread.join();
    processerThread.join();

    return 0;
}

文章来源:https://blog.csdn.net/wq_ocean_/article/details/135042246
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。