C++队列线程安全
2023-12-17 11:45:05
1.函数
#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>
// 数据包结构体
struct Packet {
int data;
// 其他数据字段...
};
// 线程安全的队列
template <typename T>
class ThreadSafeQueue {
private:
std::queue<T> queue_;
mutable std::mutex mutex_;
std::condition_variable condition_;
public:
void push(const T& value) {
std::unique_lock<std::mutex> lock(mutex_);
queue_.push(value);
lock.unlock();
condition_.notify_one();
}
T pop() {
std::unique_lock<std::mutex> lock(mutex_);
condition_.wait(lock, [this] { return !queue_.empty(); });
T value = queue_.front();
queue_.pop();
return value;
}
bool empty() const {
std::unique_lock<std::mutex> lock(mutex_);
return queue_.empty();
}
};
bool isRunning = true; // 退出标志
// 生产者线程
void producerThread(ThreadSafeQueue<Packet>& packetQueue) {
for (int i = 0; i < 10; ++i) {
Packet packet;
packet.data = i;
packetQueue.push(packet);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
isRunning = false; // 生产者线程完成任务后设置退出标志
}
// 解析线程
void parserThread(ThreadSafeQueue<Packet>& packetQueue, ThreadSafeQueue<Packet>& parsedQueue) {
while (isRunning || !packetQueue.empty()) {
Packet packet = packetQueue.pop();
// 解析数据的逻辑...
// 这里简单地将数据放入parsedQueue中
parsedQueue.push(packet);
}
}
// 处理线程
void processerThread(ThreadSafeQueue<Packet>& parsedQueue, ThreadSafeQueue<Packet>& processedQueue) {
while (isRunning || !parsedQueue.empty()) {
Packet packet = parsedQueue.pop();
// 处理数据的逻辑...
// 这里简单地将数据放入processedQueue中
printf("processedQueue.push(%d)\n", (packet.data));
processedQueue.push(packet);
}
}
int main() {
ThreadSafeQueue<Packet> packetQueue;
ThreadSafeQueue<Packet> parsedQueue;
ThreadSafeQueue<Packet> processedQueue;
std::thread producer(producerThread, std::ref(packetQueue));
std::thread parser(parserThread, std::ref(packetQueue), std::ref(parsedQueue));
std::thread processer(processerThread, std::ref(parsedQueue), std::ref(processedQueue));
producer.join();
parser.join();
processer.join();
return 0;
}
类
#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>
// 数据包结构体
struct Packet {
int data;
// 其他数据字段...
};
// 线程安全的队列
template <typename T>
class ThreadSafeQueue {
private:
std::queue<T> queue_;
mutable std::mutex mutex_;
std::condition_variable condition_;
bool isRunning_;
public:
ThreadSafeQueue() : isRunning_(true) {}
void push(const T& value) {
std::unique_lock<std::mutex> lock(mutex_);
queue_.push(value);
lock.unlock();
condition_.notify_one();
}
T pop() {
std::unique_lock<std::mutex> lock(mutex_);
condition_.wait(lock, [this] { return !queue_.empty() || !isRunning_; });
if (!queue_.empty()) {
T value = queue_.front();
queue_.pop();
return value;
}
return {}; // 空值表示队列已经关闭
}
bool empty() const {
std::unique_lock<std::mutex> lock(mutex_);
return queue_.empty();
}
bool isRunning() const {
return isRunning_;
}
void stop() {
isRunning_ = false;
condition_.notify_all();
}
std::mutex& getMutex() {
return mutex_;
}
};
// 生产者类
class Producer {
public:
void operator()(ThreadSafeQueue<Packet>& packetQueue) {
for (int i = 0; i < 10; ++i) {
Packet packet;
packet.data = i;
printf("packetQueue.push(%d)\n", (packet.data));
packetQueue.push(packet);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
packetQueue.stop(); // 告知队列停止接受数据
}
};
// 解析类
class Parser {
public:
void operator()(ThreadSafeQueue<Packet>& packetQueue, ThreadSafeQueue<Packet>& parsedQueue) {
while (true) {
Packet packet = packetQueue.pop();
if (!packetQueue.isRunning() && packetQueue.empty()) {
// 生产者已经结束并且队列为空,可以退出循环
break;
}
printf("parsedQueue.push(%d)\n", (packet.data));
// 解析数据的逻辑...
// 这里简单地将数据放入parsedQueue中
parsedQueue.push(packet);
}
}
};
// 处理类
class Processer {
public:
void operator()(ThreadSafeQueue<Packet>& parsedQueue, ThreadSafeQueue<Packet>& processedQueue) {
while (true) {
Packet packet = parsedQueue.pop();
if (!parsedQueue.isRunning() && parsedQueue.empty()) {
// 解析者已经结束并且队列为空,可以退出循环
break;
}
// 处理数据的逻辑...
// 这里简单地将数据放入processedQueue中
printf("processedQueue.push(%d)\n", (packet.data));
processedQueue.push(packet);
}
}
};
int main() {
ThreadSafeQueue<Packet> packetQueue;
ThreadSafeQueue<Packet> parsedQueue;
ThreadSafeQueue<Packet> processedQueue;
Producer producer;
Parser parser;
Processer processer;
std::thread producerThread(producer, std::ref(packetQueue));
std::thread parserThread(parser, std::ref(packetQueue), std::ref(parsedQueue));
std::thread processerThread(processer, std::ref(parsedQueue), std::ref(processedQueue));
producerThread.join();
parserThread.join();
processerThread.join();
return 0;
}
文章来源:https://blog.csdn.net/wq_ocean_/article/details/135042246
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!