【Linux】多线程编程
目录
1. 线程基础知识
详见
5.1 进程、线程基础知识 | 小林codinghttps://xiaolincoding.com/os/4_process/process_base.html#%E7%BA%BF%E7%A8%8B
2. 线程创建
#include <pthread.h>
int pthread_create(pthread_t* thread, const pthread_attr_t* attr,
void* (*start_routine)(void*), void* arg);
// 成功时返回0,失败时返回错误码
// thread 输出型参数,获取新线程ID typedef unsigned long int pthread_t;
// attr 指向新线程的属性,一般使用默认值,通常为NULL
// start_routine 函数指针,新线程将运行的函数
// arg 传给start_routine函数的参数
因为pthread并非Linux系统的默认库,而是POSIX线程库。在Linux中将其作为一个库来使用,因此加上-pthread以显式链接该库。函数在执行错误时的错误信息将作为返回值返回,并不修改系统全局变量errno,当然也无法使用perror打印错误信息。
// mythread.c
#include <pthread.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
void* callback(void* arg)
{
printf("I am child thread.\n");
printf("arg value: %d\n", *(int*)arg);
}
int main()
{
// main函数所在的线程为主线程,其余创建的线程为子线程
pthread_t tid;
int num = 10;
// 创建一个子线程
int ret = pthread_create(&tid, NULL, callback, (void*)&num);
if (ret != 0)
{
char* errstr = strerror(ret);
printf("error: %s\n", errstr);
}
printf("I am main thread.\n");
for (int i = 0; i < 5; i++)
{
printf("%d\n", i);
}
sleep(1);
return 0;
}
3. 线程ID(TID)
#include <pthread.h>
pthread_t pthread_self(void);
// 获取当前线程ID
// 这个函数总是成功的
int pthread_equal(pthread_t t1, pthread_t t2);
// 比较两个线程ID是否相等
// 这个函数总是成功的,相等返回非0值,不相等返回0
4. 线程终止
- 子线程调用return,不适用于主线程,因为在main函数中调用return相当于exit函数,表示进程终止。
- 线程调用pthread_exit函数终止自己。
- 线程调用pthread_cancel函数终止自己或其他线程,不是立刻终止,而是到取消点才终止。即线程取消。
#include <pthread.h>
void pthread_exit(void* retval);
// retval 可以指向任何类型的数据,它指向的数据将作为线程退出时的返回值,不关心则设置为NULL
// 不能指向局部变量
// 可以被主线程接收
// mythread.c
#include <pthread.h>
#include <stdio.h>
#include <string.h>
void* callback(void* arg)
{
printf("child thread id: %ld\n", pthread_self());
return NULL; // 子线程终止,等价于pthread_exit(NULL);
}
int main()
{
// 子线程
pthread_t tid;
int ret = pthread_create(&tid, NULL, callback, NULL);
if (ret != 0)
{
char* errstr = strerror(ret);
printf("error: %s\n", errstr);
}
// 主线程
for (int i = 0; i < 5; i++)
{
printf("%d\n", i);
}
printf("tid: %ld, main thread id: %ld\n", tid, pthread_self());
pthread_exit(NULL); // 主线程终止,不会影响其他正常运行的线程
return 0; // 进程终止,等价于exit(0);
}
5. 线程取消
#include <pthread.h>
int pthread_cancel(pthread_t thread);
// 给目标线程发送取消请求,目标线程收到取消请求不会立刻终止,而是执行到一个取消点才会终止
// 取消点:系统规定好的一些系统调用
// 成功时返回0,失败时返回错误码
6. 线程等待
#include <pthread.h>
int pthread_join(pthread_t thread, void** retval);
// 以阻塞的方式等待目标子线程终止,回收子线程资源
// 成功时返回0,失败时返回错误码
// thread 目标子线程ID
// retval 接收子线程退出时的返回值,不关心则设置为NULL
// mythread.c
#include <pthread.h>
#include <stdio.h>
#include <string.h>
int value = 10;
void* callback(void* arg)
{
printf("child thread id: %ld\n", pthread_self());
pthread_exit((void*)&value); // return (void*)&value;
}
int main()
{
// 子线程
pthread_t tid;
int ret = pthread_create(&tid, NULL, callback, NULL);
if (ret != 0)
{
char* errstr = strerror(ret);
printf("error1: %s\n", errstr);
}
// 主线程
for (int i = 0; i < 5; i++)
{
printf("%d\n", i);
}
printf("tid: %ld, main thread id: %ld\n", tid, pthread_self());
// 回收子线程资源
int* thread_retval;
ret = pthread_join(tid, (void**)&thread_retval);
if (ret != 0)
{
char* errstr = strerror(ret);
printf("error2: %s\n", errstr);
}
printf("exit data: %d\n", *thread_retval);
printf("回收子线程资源成功!\n");
pthread_exit(NULL);
return 0;
}
7. 线程分离
#include <pthread.h>
int pthread_detach(pthread_t thread);
// 将目标线程标识为已分离的(detached)线程
// 默认情况下,新创建的线程是可连接的(joinable)的,需要主线程调用pthread_join回收子线程资源
// 已分离的(detached)线程终止时资源被自动释放回系统,当不关心子线程退出时的返回值时,我们可以分离它
// joinable和detached是冲突的
// 成功时返回0,失败时返回错误码
// mythread.c
#include <pthread.h>
#include <stdio.h>
#include <string.h>
void* callback(void* arg)
{
printf("chid thread id: %ld\n", pthread_self());
return NULL;
}
int main()
{
// 子线程
pthread_t tid;
int ret = pthread_create(&tid, NULL, callback, NULL);
if (ret != 0)
{
char* errstr = strerror(ret);
printf("error1: %s\n", errstr);
}
// 主线程
printf("tid: %ld, main thread id: %ld\n", tid, pthread_self());
// 分离子线程
ret = pthread_detach(tid);
if (ret != 0)
{
char* errstr = strerror(ret);
printf("error2: %s\n", errstr);
}
pthread_exit(NULL);
return 0;
}
8. 线程互斥
为了避免多线程并发地访问共享资源时出现问题,可以使用互斥量(mutual exclusion,mutex)来确保对任意共享资源的原子访问。
原子性:指事务的不可分割性,一个事务的所有操作要么不间断地全部被执行,要么一个也没有执行。
互斥量本质是锁,在访问共享资源前对互斥量加锁,在访问完成后解锁。
8.1?初始化互斥量
- 静态初始化
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; // 用宏来静态初始化互斥量
- 动态初始化
#include <pthread.h>
int pthread_mutex_init(pthread_mutex_t* restrict mutex,
const pthread_mutexattr_t* restrict attr);
// 成功时返回0,失败时返回错误码
// mutex 指向要初始化的互斥量
// attr 指向互斥量的属性,通常设置为NULL,表示以系统默认的属性完成初始化
// restrict,C语言中的一种类型限定符,用于告诉编译器,对象已经被指针所引用,
// 不能通过除该指针外所有其他直接或间接的方式修改该对象的内容。
8.2 销毁互斥量
#include <pthread.h>
int pthread_mutex_destroy(pthread_mutex_t* mutex);
// 静态初始化的互斥量不需要销毁
// 不要销毁一个已经加锁的互斥量
// 已经销毁的互斥量,要确保后面不会有线程再尝试加锁
// 成功时返回0,失败时返回错误码
// mutex 指向要销毁的互斥量
8.3 互斥量加锁和解锁
#include <pthread.h>
int pthread_mutex_lock(pthread_mutex_t* mutex);
// 加锁,阻塞的,如果有一个线程加锁了,那么其他的线程只能阻塞等待
int pthread_mutex_trylock(pthread_mutex_t* mutex);
// 尝试加锁,如果加锁失败,不会阻塞,调用失败返回错误码
int pthread_mutex_unlock(pthread_mutex_t* mutex);
// 解锁
// 成功时返回0,失败时返回错误码
多线程模拟卖票:
3个窗口一共卖20张票:
// mythread.c
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
// 共享变量
int tickets = 20;
// 互斥量
pthread_mutex_t mutex;
// 卖票函数
void* sellTickets(void* arg)
{
char* id = (char*)arg;
while (1)
{
usleep(1000); // 单位是微秒
// 加锁
pthread_mutex_lock(&mutex);
if (tickets > 0)
{
printf("%s正在卖第%d张门票\n", id, 20 - tickets + 1);
tickets--;
}
else
{
// 解锁
pthread_mutex_unlock(&mutex);
break;
}
// 解锁
pthread_mutex_unlock(&mutex);
}
return NULL;
}
int main()
{
// 初始化互斥量
pthread_mutex_init(&mutex, NULL);
// 创建3个子线程
pthread_t tid1, tid2, tid3;
pthread_create(&tid1, NULL, sellTickets, "thread 1");
pthread_create(&tid2, NULL, sellTickets, "thread 2");
pthread_create(&tid3, NULL, sellTickets, "thread 3");
// 回收子线程资源
pthread_join(tid1, NULL);
pthread_join(tid2, NULL);
pthread_join(tid3, NULL);
// 销毁互斥量
pthread_mutex_destroy(&mutex);
// 退出主线程
pthread_exit(NULL);
return 0;
}
9. 可重入和线程安全
概念:
- 线程安全:多个线程并发同一段代码时,不会出现不同的结果。常见对全局变量或者静态变量进行操作,并且没有锁保护的情况下,会出现该问题。
- 重入:同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,我们称之为重入。一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重入函数,否则,是不可重入函数。
常见的线程不安全的情况:
- 不保护共享变量的函数。
- 函数状态随着被调用,状态发生变化的函数。
- 返回指向静态变量指针的函数。
- 调用线程不安全函数的函数。
常见的线程安全的情况:
- 每个线程对全局变量或者静态变量只有读取的权限,而没有写入的权限,一般来说这些线程是安全的。
- 类或者接口对于线程来说都是原子操作。
- 多个线程之间的切换不会导致该接口的执行结果存在二义性。
常见的不可重入的情况:
- 调用了malloc/free函数,因为malloc函数是用全局链表来管理堆的。
- 调用了标准I/O库函数,标准I/O库的很多实现都以不可重入的方式使用全局数据结构。
- 可重入函数体内使用了静态的数据结构。
常见的可重入的情况:
- 不使用全局变量或静态变量。
- 不使用用malloc或者new开辟出的空间。
- 不调用不可重入函数。
- 不返回静态或全局数据,所有数据都有函数的调用者提供。
- 使用本地数据,或者通过制作全局数据的本地拷贝来保护全局数据。
可重入与线程安全的联系:
- 函数是可重入的,那就是线程安全的。
- 函数是不可重入的,那就不能由多个线程使用,有可能引发线程安全问题。
- 如果一个函数中有全局变量,那么这个函数既不是线程安全也不是可重入的。
可重入与线程安全的区别:
- 可重入函数是线程安全函数的一种。
- 线程安全不一定是可重入的,而可重入函数则一定是线程安全的。
- 如果将对临界资源的访问加上锁,则这个函数是线程安全的,但如果这个重入函数若锁还未释放则会产生死锁,因此是不可重入的。
10. 线程同步之条件变量
例如一个线程访问队列时,发现队列为空,它只能等待,直到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量。
条件变量是利用线程间共享的全局变量进行同步的一种机制,主要包括两个动作:一个线程等待“条件变量的条件成立”而挂起;另一个线程使“条件成立”(给出条件成立信号)。为了防止竞争,条件变量的使用总是和一个互斥量结合在一起。
10.1 初始化条件变量
- 静态初始化
pthread_cond_t cond = PTHREAD_COND_INITIALIZER; // 用宏来静态初始化条件变量
- 动态初始化
#include <pthread.h>
int pthread_cond_init(pthread_cond_t* restrict cond,
const pthread_condattr_t* restrict attr);
// 成功时返回0,失败时返回错误码
// cond 指向要初始化的条件变量
// attr 指向条件变量的属性,通常设置为NULL,表示以系统默认的属性完成初始化
// restrict,C语言中的一种类型限定符,用于告诉编译器,对象已经被指针所引用,
// 不能通过除该指针外所有其他直接或间接的方式修改该对象的内容。
10.2 销毁条件变量
#include <pthread.h>
int pthread_cond_destroy(pthread_cond_t* cond);
// 静态初始化的条件变量不需要销毁
// 成功时返回0,失败时返回错误码
// cond 指向要销毁的条件变量
10.3 等待条件成立
当条件不成立时,条件变量可以阻塞当前线程,所有被阻塞的线程会构成一个等待队列。
#include <pthread.h>
int pthread_cond_timedwait(pthread_cond_t* restrict cond,
pthread_mutex_t* restrict mutex,
const struct timespec* restrict abstime);
// 在abstime指定的时间内阻塞线程,直到条件成立
// 超时返回ETIMEDOUT
int pthread_cond_wait(pthread_cond_t* restrict cond,
pthread_mutex_t* restrict mutex);
// 永久阻塞线程,直到条件成立
// 成功时返回0,失败时返回错误码
// cond 指向已经初始化的条件变量
// mutex 指向已经加锁的互斥量
// abstime 指向表示绝对时间的结构体,系统当前时间+等待时间
线程对互斥量加锁后准备执行任务,发现条件不成立,这时就需要阻塞线程,等待条件成立。如果什么都不做,直接等待,这个线程占着锁,其他线程就申请不到锁了。所以要把加锁的互斥量传给函数,函数被调用后会阻塞线程并自动释放锁,等待条件成立或超时,重新加锁并解除阻塞,然后函数返回。
10.4 唤醒等待
#include<pthread.h>
int pthread_cond_signal(pthread_cond_t *cond);
// 唤醒一个等待cond条件发生的线程
int pthread_cond_broadcast(pthread_cond_t *cond);
// 唤醒所有等待cond条件发生的线程
// 成功时返回0,失败时返回错误码
11. 基于阻塞队列的生产者-消费者模型
- 生产者在生产数据后,不直接给消费者,而是将数据放入到缓冲区
- 消费者从缓冲区中取出数据
- 任何时刻,只能有一个生产者或消费者可以访问缓冲区,所以生产者和生产者之间、消费者和消费者之间、生产者和消费者之间是互斥关系
- 缓冲区空时,消费者必须等待生产者生产数据,缓冲区满时,生产者必须等待消费者取出数据,所以生产者和消费者之间又是同步关系
把缓冲区设计成一个阻塞队列,有先进先出的特性,并且,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出。
321原则:
- 3种关系:生产者和生产者之间是互斥关系
? ? ? ? ? ? ? ? ?消费者和消费者之间是互斥关系
? ? ? ? ? ? ? ? ?生产者和消费者之间是同步+互斥关系 - 2种角色:生产者、消费者
- 1个交易场所:阻塞队列
生产者-消费者模型的优点:
- 解耦:有了缓冲区之后,生产者与消费者之间的直接依赖关系就减少了,如果二者的行为发生变化(或者代码形式改变),就不会直接影响到对方。
- 支持并发:如果直接让生产者与消费者之间进行联系,那么二者在代码中就是顺序执行的,就是生产者生产一个,消费者消费掉这个货品。如果双方有一方出现问题,那么程序就会一直阻塞,大大降低效率。所以加上缓冲区后,双方就不必再等待对方完成动作。
- 支持忙闲不均:如果生产者生产的速度不均衡,导致时快时慢,让消费者来不及消费,那么缓冲区的作用就体现出来了,让消费者来不及处理的数据先放在缓冲区中,等消费者有时间再去处理。减少忙的时间太忙,闲的时间太闲的现象。
11.1 生产整型变量
// BlockingQueue.hpp
#pragma once
#include <queue>
#include <pthread.h>
template<typename T>
class BlockingQueue
{
public:
BlockingQueue(int capacity = 5)
: _capacity(capacity)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_producerCond, nullptr);
pthread_cond_init(&_consumerCond, nullptr);
}
bool isFull() { return _q.size() == _capacity; }
bool isEmpty() { return _q.empty(); }
void push(const T& in)
{
pthread_mutex_lock(&_mutex);
while (isFull()) // 用while不用if,防止虚假唤醒
{
pthread_cond_wait(&_producerCond, &_mutex);
}
// 缓冲区不满就放入
_q.push(in);
// 生产完,缓冲区一定不是空的,唤醒消费者线程
// 可以加策略,比如缓冲区内数据的个数>容量的一半时,再唤醒
// if(_q.size() >= _cap/2) pthread_cond_signal(&_consumerCond);
// 我们这里就不加策略了,直接唤醒
pthread_cond_signal(&_consumerCond);
pthread_mutex_unlock(&_mutex);
// pthread_cond_signal(&_consumerCond); // 唤醒放在解锁前面和后面均可
}
void pop(T* out)
{
pthread_mutex_lock(&_mutex);
while (isEmpty()) // 用while不用if,防止虚假唤醒
{
pthread_cond_wait(&_consumerCond, &_mutex);
}
// 缓冲区不空就取出
*out = _q.front();
_q.pop();
// 消费完,缓冲区一定不是满的,唤醒生产者线程
// 这里也可以加策略,但是我们就不加了,直接唤醒
pthread_cond_signal(&_producerCond);
pthread_mutex_unlock(&_mutex);
// pthread_cond_signal(&_producerCond); // 唤醒放在解锁前面和后面均可
}
~BlockingQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_consumerCond);
pthread_cond_destroy(&_producerCond);
}
private:
std::queue<T> _q;
int _capacity;
pthread_mutex_t _mutex;
pthread_cond_t _producerCond; // 生产者对应的条件变量,缓冲区满就等待
pthread_cond_t _consumerCond; // 消费者对应的条件变量,缓冲区空就等待
};
// main.cc
#include "BlockingQueue.hpp"
#include <iostream>
#include <cstdlib>
#include <ctime>
#include <unistd.h>
// 生产者线程函数
void* producer(void* arg)
{
BlockingQueue<int>* bq = static_cast<BlockingQueue<int>*>(arg);
while (true)
{
sleep(1);
// 1. 通过某种渠道获取数据
int data = rand() % 10 + 1; // 生产1-10的数字
// 2. 将数据放入阻塞队列中
bq->push(data);
std::cout << "producer data: " << data << std::endl;
}
}
// 消费者线程函数
void* consumer(void* arg)
{
BlockingQueue<int>* bq = static_cast<BlockingQueue<int>*>(arg);
while (true)
{
int data = 0;
// 1. 从阻塞队列中取出数据
bq->pop(&data);
// 2. 结合某种业务逻辑处理数据
// 我们这里写一行打印代码当作处理数据了
std::cout << "consumer data: " << data << std::endl;
}
}
int main()
{
srand((unsigned int)time(nullptr) ^ getpid()); // 设置随机数种子
BlockingQueue<int>* bq = new BlockingQueue<int>();
pthread_t p[3], c[2];
pthread_create(&p[0], nullptr, producer, bq);
pthread_create(&p[1], nullptr, producer, bq);
pthread_create(&p[2], nullptr, producer, bq);
pthread_create(&c[0], nullptr, consumer, bq);
pthread_create(&c[1], nullptr, consumer, bq);
pthread_join(p[0], nullptr);
pthread_join(p[1], nullptr);
pthread_join(p[2], nullptr);
pthread_join(c[0], nullptr);
pthread_join(c[1], nullptr);
delete bq;
return 0;
}
11.2 生产自定义类对象
BlockingQueue.hpp内容不变,再增加一个Task.hpp用来定义Task类,main.cc内容稍微改变。
// BlockingQueue.hpp
#pragma once
#include <queue>
#include <pthread.h>
template<typename T>
class BlockingQueue
{
public:
BlockingQueue(int capacity = 5)
: _capacity(capacity)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_producerCond, nullptr);
pthread_cond_init(&_consumerCond, nullptr);
}
bool isFull() { return _q.size() == _capacity; }
bool isEmpty() { return _q.empty(); }
void push(const T& in)
{
pthread_mutex_lock(&_mutex);
while (isFull()) // 用while不用if,防止虚假唤醒
{
pthread_cond_wait(&_producerCond, &_mutex);
}
// 缓冲区不满就放入
_q.push(in);
// 生产完,缓冲区一定不是空的,唤醒消费者线程
// 可以加策略,比如缓冲区内数据的个数>容量的一半时,再唤醒
// if(_q.size() >= _cap/2) pthread_cond_signal(&_consumerCond);
// 我们这里就不加策略了,直接唤醒
pthread_cond_signal(&_consumerCond);
pthread_mutex_unlock(&_mutex);
// pthread_cond_signal(&_consumerCond); // 唤醒放在解锁前面和后面均可
}
void pop(T* out)
{
pthread_mutex_lock(&_mutex);
while (isEmpty()) // 用while不用if,防止虚假唤醒
{
pthread_cond_wait(&_consumerCond, &_mutex);
}
// 缓冲区不空就取出
*out = _q.front();
_q.pop();
// 消费完,缓冲区一定不是满的,唤醒生产者线程
// 这里也可以加策略,但是我们就不加了,直接唤醒
pthread_cond_signal(&_producerCond);
pthread_mutex_unlock(&_mutex);
// pthread_cond_signal(&_producerCond); // 唤醒放在解锁前面和后面均可
}
~BlockingQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_consumerCond);
pthread_cond_destroy(&_producerCond);
}
private:
std::queue<T> _q;
int _capacity;
pthread_mutex_t _mutex;
pthread_cond_t _producerCond; // 生产者对应的条件变量,缓冲区满就等待
pthread_cond_t _consumerCond; // 消费者对应的条件变量,缓冲区空就等待
};
// Task.hpp
#pragma once
#include <string>
class Task
{
public:
Task() {}
Task(int x, int y, char op)
: _x(x)
, _y(y)
, _op(op)
, _result(0)
, _exitCode(0)
{}
void operator()()
{
switch (_op)
{
case '+':
_result = _x + _y;
break;
case '-':
_result = _x - _y;
break;
case '*':
_result = _x * _y;
break;
case '/':
{
if (_y == 0)
_exitCode = -1;
else
_result = _x / _y;
}
break;
case '%':
{
if (_y == 0)
_exitCode = -2;
else
_result = _x % _y;
}
break;
default:
break;
}
}
std::string formatArg()
{
return std::to_string(_x) + _op + std::to_string(_y) + "=";
}
std::string formatRes()
{
return std::to_string(_result) + "(" + std::to_string(_exitCode) + ")";
}
~Task() {}
private:
int _x;
int _y;
char _op;
int _result;
int _exitCode;
};
// main.cc
#include "BlockingQueue.hpp"
#include "Task.hpp"
#include <iostream>
#include <cstdlib>
#include <ctime>
#include <unistd.h>
// 生产者线程函数
void* producer(void* arg)
{
BlockingQueue<Task>* bq = static_cast<BlockingQueue<Task>*>(arg);
std::string ops = "+-*/%";
while (true)
{
// 1. 通过某种渠道获取数据
int x = rand() % 20 + 1; // x的范围是1~20
int y = rand() % 10 + 1; // y的范围是1~10
char op = ops[rand() % ops.size()]; // 随机取一个运算符
// 2. 将数据放入阻塞队列中
Task t(x, y, op);
bq->push(t);
std::cout << "producer Task: " << t.formatArg() << "?" << std::endl;
}
}
// 消费者线程函数
void* consumer(void* arg)
{
BlockingQueue<Task>* bq = static_cast<BlockingQueue<Task>*>(arg);
while (true)
{
sleep(1);
Task t;
// 1. 从阻塞队列中取出数据
bq->pop(&t);
// 2. 结合某种业务逻辑处理数据
t();
std::cout << "consumer Task: " << t.formatArg() << t.formatRes() << std::endl;
}
}
int main()
{
srand((unsigned int)time(nullptr) ^ getpid()); // 设置随机数种子
BlockingQueue<Task>* bq = new BlockingQueue<Task>();
pthread_t p[3], c[2];
pthread_create(&p[0], nullptr, producer, bq);
pthread_create(&p[1], nullptr, producer, bq);
pthread_create(&p[2], nullptr, producer, bq);
pthread_create(&c[0], nullptr, consumer, bq);
pthread_create(&c[1], nullptr, consumer, bq);
pthread_join(p[0], nullptr);
pthread_join(p[1], nullptr);
pthread_join(p[2], nullptr);
pthread_join(c[0], nullptr);
pthread_join(c[1], nullptr);
delete bq;
return 0;
}
12. 线程同步之信号量
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。
条件变量和信号量的区别:
- 条件变量:在临界区内部做判断,决定是否等待
- 信号量:不进入临界区就知道临界资源的使用情况,临界资源为0就等待
12.1 初始化信号量
#include <semaphore.h>
int sem_init(sem_t* sem, int pshared, unsigned int value);
// 成功时返回0,失败时返回-1并设置errno
// sem 指向要初始化的信号量
// pshared 0表示线程间共享,非0表示进程间共享
// value 信号量初始值,信号量的值表示资源的数量
12.2 销毁信号量
#include <semaphore.h>
int sem_destroy(sem_t* sem);
// 成功时返回0,失败时返回-1并设置errno
// sem 指向要销毁的信号量
12.3 等待信号量(P操作)
#include <semaphore.h>
int sem_wait(sem_t* sem);
// 如果信号量的值>0,信号量的值-1
// 如果信号量的值=0,永久阻塞线程,直到信号量的值>0,信号量的值-1
int sem_trywait(sem_t* sem);
// 如果信号量的值>0,信号量的值-1
// 如果信号量的值=0,不阻塞线程,调用失败
int sem_timedwait(sem_t* sem, const struct timespec* abs_timeout);
// 如果信号量的值>0,信号量的值-1
// 如果信号量的值=0,在abs_timeout指定的时间内阻塞线程,直到信号量的值>0,信号量的值-1
// 超时则调用失败
// 成功时返回0,失败时返回-1并设置errno
12.4 发布信号量(V操作)
#include <semaphore.h>
int sem_post(sem_t* sem);
// 信号量的值+1
// 成功时返回0,失败时返回-1并设置errno
13. 基于环形队列的生产者-消费者模型
用数组模拟环形队列:
- 初始时:head = tail = 0
- 队首下标进1:head = (head + 1) % capacity
- 队尾下标进1:tail = (tail + 1) % capacity
- 队列长度:(tail + capacity - head) % capacity
生产者-消费者模型分析:
- 生产者在生产数据后,不直接给消费者,而是将数据放入环形队列的tail
- 消费者从环形队列的head中取出数据
- 只要生产者和消费者访问不同的区域(不是队空 / 队满),生产和消费可以同时进行,但只能有一个生产者在生产,也只能有一个消费者在消费
- 队空时,消费者必须等待生产者生产数据,队满时,生产者必须等待消费者取出数据,所以生产者和消费者之间是同步关系
321原则:
- 3种关系:生产者和生产者之间是互斥关系
? ? ? ? ? ? ? ? ?消费者和消费者之间是互斥关系
? ? ? ? ? ? ? ? ?生产者和消费者之间是同步关系 - 2种角色:生产者、消费者
- 1个交易场所:环形队列
两种生产者-消费者模型的区别:
- 基于阻塞队列的生产者-消费者模型:任何时刻,只能有一个生产者或消费者可以访问缓冲区。临界资源是被整体使用的,所有线程之间都有互斥关系,只需要一把锁就可以实现互斥。
- 基于环形队列的生产者-消费者模型:只要生产者和消费者访问不同的区域(不是队空 / 队满),生产和消费可以同时进行,但只能有一个生产者在生产,也只能有一个消费者在消费。临界资源不是被整体使用的,生产者使用tail,消费者使用head,它们可以同时进行。生产者和消费者各需要一把锁才能实现生产者和生产者之间的互斥、消费者和消费者之间的互斥。
13.1 生产整型变量
// RingQueue.hpp
#pragma once
#include <vector>
#include <pthread.h>
#include <semaphore.h>
template<typename T>
class RingQueue
{
public:
RingQueue(int capacity = 5)
: _ring(capacity)
, _capacity(capacity)
, _head(0)
, _tail(0)
{
sem_init(&_data_sem, 0, 0);
sem_init(&_space_sem, 0, capacity);
pthread_mutex_init(&_c_mutex, nullptr);
pthread_mutex_init(&_p_mutex, nullptr);
}
void push(const T& in)
{
// 先加锁,再P操作会怎么样?线程申请到锁之后,如果申请信号量失败,会带着锁阻塞,其他同类型的线程还会竞争锁但是竞争不到锁
// 先P操作,再加锁会怎么样?线程如果申请信号量失败,就不会去申请锁,这样会减少申请锁的次数,申请锁也是有代价的
// 先P操作,再加锁,效率更高
P(_space_sem); // 空闲的空间-1
Lock(_p_mutex);
_ring[_tail] = in;
_tail = (_tail + 1) % _capacity;
Unlock(_p_mutex);
V(_data_sem); // 存放的数据+1
}
void pop(T* out)
{
// 先P操作,再加锁,效率更高
P(_data_sem); // 存放的数据-1
Lock(_c_mutex);
*out = _ring[_head];
_head = (_head + 1) % _capacity;
Unlock(_c_mutex);
V(_space_sem); // 空闲的空间+1
}
~RingQueue()
{
sem_destroy(&_data_sem);
sem_destroy(&_space_sem);
pthread_mutex_destroy(&_c_mutex);
pthread_mutex_destroy(&_p_mutex);
}
private:
void P(sem_t &s)
{
sem_wait(&s);
}
void V(sem_t &s)
{
sem_post(&s);
}
void Lock(pthread_mutex_t &m)
{
pthread_mutex_lock(&m);
}
void Unlock(pthread_mutex_t &m)
{
pthread_mutex_unlock(&m);
}
std::vector<T> _ring; // 数组模拟环形队列
int _capacity; // 环形队列的容量
int _tail; // 生产者的位置
int _head; // 消费者的位置
sem_t _space_sem; // 生产者关心环形队列中空闲的空间,只要还有空间就能生产
sem_t _data_sem; // 消费者关心环形队列中存放的数据,只要还有数据就能消费
// 如果是单生产单消费不需要锁,多生产多消费需要两把锁,因为生产者和生产者之间是互斥的,消费者和消费者之间是互斥的
pthread_mutex_t _c_mutex; // 生产者锁
pthread_mutex_t _p_mutex; // 消费者锁
};
// main.cc
#include "RingQueue.hpp"
#include <iostream>
#include <cstdlib>
#include <ctime>
#include <unistd.h>
// 生产者线程函数
void* producer(void* arg)
{
RingQueue<int>* rq = static_cast<RingQueue<int>*>(arg);
while (true)
{
sleep(1);
// 1. 通过某种渠道获取数据
int data = rand() % 10 + 1; // 生产1-10的数字
// 2. 将数据放入环形队列中
rq->push(data);
std::cout << "producer data: " << data << std::endl;
}
}
// 消费者线程函数
void* consumer(void* arg)
{
RingQueue<int>* rq = static_cast<RingQueue<int>*>(arg);
while (true)
{
int data = 0;
// 1. 从环形队列中取出数据
rq->pop(&data);
// 2. 结合某种业务逻辑处理数据
// 我们这里写一行打印代码当作处理数据了
std::cout << "consumer data: " << data << std::endl;
}
}
int main()
{
srand((unsigned int)time(nullptr) ^ getpid()); // 设置随机数种子
RingQueue<int>* rq = new RingQueue<int>();
pthread_t p[3], c[2];
pthread_create(&p[0], nullptr, producer, rq);
pthread_create(&p[1], nullptr, producer, rq);
pthread_create(&p[2], nullptr, producer, rq);
pthread_create(&c[0], nullptr, consumer, rq);
pthread_create(&c[1], nullptr, consumer, rq);
pthread_join(p[0], nullptr);
pthread_join(p[1], nullptr);
pthread_join(p[2], nullptr);
pthread_join(c[0], nullptr);
pthread_join(c[1], nullptr);
delete rq;
return 0;
}
13.2 生产自定义类对象
RingQueue.hpp内容不变,再增加一个Task.hpp用来定义Task类,main.cc内容稍微改变。
// RingQueue.hpp
#pragma once
#include <vector>
#include <pthread.h>
#include <semaphore.h>
template<typename T>
class RingQueue
{
public:
RingQueue(int capacity = 5)
: _ring(capacity)
, _capacity(capacity)
, _head(0)
, _tail(0)
{
sem_init(&_data_sem, 0, 0);
sem_init(&_space_sem, 0, capacity);
pthread_mutex_init(&_c_mutex, nullptr);
pthread_mutex_init(&_p_mutex, nullptr);
}
void push(const T& in)
{
// 先加锁,再P操作会怎么样?线程申请到锁之后,如果申请信号量失败,会带着锁阻塞,其他同类型的线程还会竞争锁但是竞争不到锁
// 先P操作,再加锁会怎么样?线程如果申请信号量失败,就不会去申请锁,这样会减少申请锁的次数,申请锁也是有代价的
// 先P操作,再加锁,效率更高
P(_space_sem); // 空闲的空间-1
Lock(_p_mutex);
_ring[_tail] = in;
_tail = (_tail + 1) % _capacity;
Unlock(_p_mutex);
V(_data_sem); // 存放的数据+1
}
void pop(T* out)
{
// 先P操作,再加锁,效率更高
P(_data_sem); // 存放的数据-1
Lock(_c_mutex);
*out = _ring[_head];
_head = (_head + 1) % _capacity;
Unlock(_c_mutex);
V(_space_sem); // 空闲的空间+1
}
~RingQueue()
{
sem_destroy(&_data_sem);
sem_destroy(&_space_sem);
pthread_mutex_destroy(&_c_mutex);
pthread_mutex_destroy(&_p_mutex);
}
private:
void P(sem_t &s)
{
sem_wait(&s);
}
void V(sem_t &s)
{
sem_post(&s);
}
void Lock(pthread_mutex_t &m)
{
pthread_mutex_lock(&m);
}
void Unlock(pthread_mutex_t &m)
{
pthread_mutex_unlock(&m);
}
std::vector<T> _ring; // 数组模拟环形队列
int _capacity; // 环形队列的容量
int _tail; // 生产者的位置
int _head; // 消费者的位置
sem_t _space_sem; // 生产者关心环形队列中空闲的空间,只要还有空间就能生产
sem_t _data_sem; // 消费者关心环形队列中存放的数据,只要还有数据就能消费
// 如果是单生产单消费不需要锁,多生产多消费需要两把锁,因为生产者和生产者之间是互斥的,消费者和消费者之间是互斥的
pthread_mutex_t _c_mutex; // 生产者锁
pthread_mutex_t _p_mutex; // 消费者锁
};
// Task.hpp
#pragma once
#include <string>
class Task
{
public:
Task() {}
Task(int x, int y, char op)
: _x(x)
, _y(y)
, _op(op)
, _result(0)
, _exitCode(0)
{}
void operator()()
{
switch (_op)
{
case '+':
_result = _x + _y;
break;
case '-':
_result = _x - _y;
break;
case '*':
_result = _x * _y;
break;
case '/':
{
if (_y == 0)
_exitCode = -1;
else
_result = _x / _y;
}
break;
case '%':
{
if (_y == 0)
_exitCode = -2;
else
_result = _x % _y;
}
break;
default:
break;
}
}
std::string formatArg()
{
return std::to_string(_x) + _op + std::to_string(_y) + "=";
}
std::string formatRes()
{
return std::to_string(_result) + "(" + std::to_string(_exitCode) + ")";
}
~Task() {}
private:
int _x;
int _y;
char _op;
int _result;
int _exitCode;
};
// main.cc
#include "RingQueue.hpp"
#include "Task.hpp"
#include <iostream>
#include <cstdlib>
#include <ctime>
#include <unistd.h>
// 生产者线程函数
void* producer(void* arg)
{
RingQueue<Task>* rq = static_cast<RingQueue<Task>*>(arg);
std::string ops = "+-*/%";
while (true)
{
// 1. 通过某种渠道获取数据
int x = rand() % 20 + 1; // x的范围是1~20
int y = rand() % 10 + 1; // y的范围是1~10
char op = ops[rand() % ops.size()]; // 随机取一个运算符
// 2. 将数据放入环形队列中
Task t(x, y, op);
rq->push(t);
std::cout << "producer Task: " << t.formatArg() << "?" << std::endl;
}
}
// 消费者线程函数
void* consumer(void* arg)
{
RingQueue<Task>* rq = static_cast<RingQueue<Task>*>(arg);
while (true)
{
sleep(1);
Task t;
// 1. 从环形队列中取出数据
rq->pop(&t);
// 2. 结合某种业务逻辑处理数据
t();
std::cout << "consumer Task: " << t.formatArg() << t.formatRes() << std::endl;
}
}
int main()
{
srand((unsigned int)time(nullptr) ^ getpid()); // 设置随机数种子
RingQueue<Task>* rq = new RingQueue<Task>();
pthread_t p[3], c[2];
pthread_create(&p[0], nullptr, producer, rq);
pthread_create(&p[1], nullptr, producer, rq);
pthread_create(&p[2], nullptr, producer, rq);
pthread_create(&c[0], nullptr, consumer, rq);
pthread_create(&c[1], nullptr, consumer, rq);
pthread_join(p[0], nullptr);
pthread_join(p[1], nullptr);
pthread_join(p[2], nullptr);
pthread_join(c[0], nullptr);
pthread_join(c[1], nullptr);
delete rq;
return 0;
}
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!