除了线程互斥,我们还有线程同步,来康康吧
[TOC]
1.为什么需要同步 在部分条件下,互斥是正确的,但是不合理。比如食堂打饭的时候,食堂阿姨一次只能给一个人打饭,于是就选择通过竞争来获得打饭的权利。此时就会出现有些线程因为优先级过低或者CPU调度问题,一直打不到饭,于是就出现了饥饿问题 。
这是因为我们对多线程访问同一个资源没有进行限制,全靠CPU调度来决定运行顺序;所以我们需要对线程进行一定的控制,这就是线程同步 的概念
饥饿问题:某一个线程一直无法申请到某种资源(比如因为优先级过低) 同步:保证数据安全(临界资源访问)的前提下,让线程根据一定条件和顺序访问临界资源,从而避免饥饿问题 竞态条件:因为时序问题(CPU调度)而导致程序异常; 2.生产消费模型 这个模型其实很简单,消费者去超市购买东西,生产者把商品投放到超市中。
这时候就不需要消费者直接去找工厂问他xx东西又没有生产,他需要购买;而是转去超市里面购买xx东西; 如果xx东西没有货了,超市就会通知生产者进行补货。如果超市里面的货架已经满了,就通知生产者不需要继续生产了; 当商品没货了,超市会告知消费者这个东西没货,消费者会停止消费行为;而生产者补货了之后,超市就会通知消费者让他来购买
在基础模式中,消费者要想知道一个东西有没有货,需要去超市里面看(相当于轮循检测)
我们可以引入一个通知方式 ,比如超市开放一个微信公众号,告知消费者xx物品是否有货,以及告知消费者什么时候需要补货,此时就不需要消费者和生产者不断询问 超市关于一个商品的情况了!这就相当于线程同步
!
2.1 生产者和消费者的关系 下面提到的是普适情况
消费者有多个,他们之间是竞争关系(互斥)竞争商品的购买 生产者有多个,他们之间是竞争关系(互斥)竞争超市的货架 消费者和生产者之间,既有互斥关系,也是同步关系(需要生产者供货了之后,消费者才能消费)这两个关系并不冲突! 除了上面提到的3种生产关系 ,还有下面俩点
在实际程序中,消费者和生产者都是由线程承担的(2种角色 )
超市是内存中特定的一个数据结构,也是临界资源(1个交易场所 )
我们可以用321原则 来快速记住这几条,这样就记住了生产消费模型!👍
2.2 以简单代码为例 在旧模式中,main函数调用另外一个函数,想获得返回值,需要等这个函数运行完毕;好比消费者购买东西,需要去找厂家并等待厂家生产……
而在生产消费者模型中,main作为主线程,只需要把待处理的数据丢进缓冲区;而线程B从缓冲区中取出数据,处理完毕后放回缓冲区。main可以先执行其他代码,过一会再过来拿线程B处理好的结果。
这就实现了生产和消费的解耦 !
2.3 并发 生产消费模型的并发,更多的体现在消费者在处理任务的同时,生产者可以生产任务;
线程切换的成本低于进程,由此便提高了数据处理的效率
接下来我们就要解决下面这些问题😁
1 2 3 1.如何让多个消费者线程等待呢?又如何让消费者线程被唤醒呢? 2.如何让多个生产者线程等待呢?又如何让生产者线程被唤醒呢? 3.如何衡量消费者和生产者所关心的条件是否就绪呢?
而前面提到的通知方式,在linux系统中,就是条件变量
了!
3.条件变量接口 3.1 init/destroy 基本的接口和pthread库的其他接口很相似,都是一样的用法;其中attr
也是设置条件变量的属性,这里置为nullptr即可
1 2 3 4 5 6 #include <pthread.h> int pthread_cond_destroy (pthread_cond_t *cond) ;int pthread_cond_init (pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr) ;pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
3.2 pthread_cond_wait 这两个接口都是让线程在一个条件变量下进行等待,其中timewait
接口可以设置等待的时间(超时了就不等了)
条件变量也是临界资源,所以这里需要一个mutex锁来保证条件变量 读写的原子性
1 2 3 4 5 6 7 #include <pthread.h> int pthread_cond_timedwait (pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime) ;int pthread_cond_wait (pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex) ;
3.3 pthrea_cond_signal/broadcast 这个接口的作用是给在条件变量下等待的线程发信号
1 2 3 4 #include <pthread.h> int pthread_cond_broadcast (pthread_cond_t *cond) ;int pthread_cond_signal (pthread_cond_t *cond) ;
其中broadcast
是给在当前条件变量等待的所有线程 发信号,而signal
是发送信号,只唤醒一个线程;
如果调用成功,这两个函数都会返回0;否则返回错误码
1 2 3 RETURN VALUE If successful, the pthread_cond_broadcast() and pthread_cond_signal() functions shall return zero; otherwise, an error number shall be returned to indicate the error.
3.4 代码示例 下面这个代码可以很好的演示上面提到的多个接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 #include <iostream> #include <string.h> #include <signal.h> #include <pthread.h> #include <thread> #include <unistd.h> #include <sys/types.h> #include <sys/syscall.h> using namespace std;pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;pthread_cond_t cond;volatile bool quit = false ;void *func (void * arg) { while (!quit) { pthread_cond_wait (&cond,&mutex); cout << "thread is running... " << (char *)arg << endl; } cout << "thread quit... " << (char *)arg << endl; } int main () { pthread_cond_init (&cond,nullptr ); pthread_t t1,t2,t3; pthread_create (&t1,nullptr ,func,(void *)"t1" ); pthread_create (&t2,nullptr ,func,(void *)"t2" ); pthread_create (&t3,nullptr ,func,(void *)"t3" ); char c; while (1 ) { cout << "[a/b]$ " ; cin >> c; if (c=='a' ) { pthread_cond_signal (&cond); } else if (c=='b' ) { pthread_cond_broadcast (&cond); } else { quit = true ; break ; } usleep (500 ); } cout << "main break: " << quit << endl; sleep (1 ); pthread_cond_broadcast (&cond); pthread_join (t1,nullptr ); pthread_join (t2,nullptr ); pthread_join (t3,nullptr ); return 0 ; }
每次输入a,就signal
会唤醒一个线程;每次输入b,会调用broadcast
,唤醒当前所有线程
1 2 3 4 5 6 7 8 9 10 11 [muxue@bt-7274:~/git/linux/code/22-12-25_线程同步]$ ./test [a/b]$ a thread is running... t1 [a/b]$ a thread is running... t2 [a/b]$ a thread is running... t3 [a/b]$ b thread is running... t1 thread is running... t2 thread is running... t3
3.4.1 小bug 上面的代码示例,会出现下面的问题,即我们输入除了a和b以外的所有字符,都应该会把全局变量quit
改成true,让三个线程都退出
但观察到的现象却是只有一个线程退出了,其他线程阻塞等待了
1 2 3 4 5 6 [muxue@bt-7274:~/git/linux/code/22-12-25_线程同步]$ ./test [a/b]$ q main break: 1 thread is running... t3 thread quit... t3
这是因为pthread_cond_wait
里面进行了独特的操作 ,即等待之前,它会释放锁,等待之后,他会重新申请锁
1 2 3 4 5 6 7 8 9 10 int pthread_cond_wait (pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex) { pthread_mutex_unlock(mutex); pthread_mutex_lock(mutex); }
第一个退出的线程,退出之前申请了锁却没有释放,于是就导致其他线程在条件满足后 ,没有办法申请锁,只能阻塞等待!
3.4.2 修正 修正的方法很简单,我们只需要在while(!quit)
循环的退出条件满足之后,释放一下锁,就OK了!
1 2 3 4 5 6 7 8 9 10 11 void *func (void * arg) { while (!quit) { pthread_cond_wait (&cond,&mutex); cout << "thread is running... " << (char *)arg << endl; } pthread_mutex_unlock (&mutex); cout << "thread quit... " << (char *)arg << endl; }
此时就能看到,所有线程都正常退出了!
1 2 3 4 5 6 7 8 9 10 [muxue@bt-7274:~/git/linux/code/22-12-25_线程同步]$ ./test [a/b]$ q main break: 1 thread is running... t1 thread quit... t1 thread is running... t2 thread quit... t2 thread is running... t3 thread quit... t3 [muxue@bt-7274:~/git/linux/code/22-12-25_线程同步]$
3.4.3 典型错误 根据这点,就能引出一个比较典型的错误
1 2 3 4 5 6 7 pthread_mutex_lock(&mutex); while (condition_is_false) { pthread_mutex_unlock(&mutex); pthread_cond_wait(&cond); pthread_mutex_lock(&mutex); } pthread_mutex_unlock(&mutex);
两次申请同一把锁,就好比自己把自己绊倒了😂我们要避免写出这样的错误代码!
4.阻塞队列-生产消费模型实例 这个队列的作用,就是提供一个超市,供生产者和消费者进行数据的交换
生产者,往队列里面push数据 消费者,从队列里面pop数据 看起来有些类似于管道,同样的,生产者和消费者在读取阻塞队列的时候,不仅需要保证自己的操作是原子操作,还需要做到一定的访问控制
;即消费者在队列空的时候不能继续pop,生产者在队列满的时候不能继续push
此时,我们还可以引入一个微信公众号
,也就是一定的通知方式:不要让生产者、消费者疯狂检测阻塞队列,而是引入条件变量,在队列不为空的时候,通知消费者;在队列不为满的时候,通知生产者;这样就达到了线程之间的同步。
4.1 成员变量 要实现阻塞队列,我们首先需要理清楚需要什么成员变量,来保护该队列
用于访问控制的锁,同一时刻只能有一个线程访问队列 用户线程同步的条件变量,因为我们需要在不同的条件下通知不同的人,所以需要2个条件变量 一个队列,为了方便,采用std::queue
,这样就不用自己造轮子了 理清楚了之后,就可以来写成员变量啦;我采用了模板类型,这样阻塞队列就可以用来存放任何我们想要的类型了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 template <class T >class BlockQueue { private : queue<T> _bq; size_t _size; pthread_mutex_t _mutex; pthread_cond_t _proInf; pthread_cond_t _conInf; public : BlockQueue (int sz=5 ) :_size(sz) { pthread_mutex_init (&_mutex,nullptr ); pthread_cond_init (&_proInf,nullptr ); pthread_cond_init (&_conInf,nullptr ); } ~BlockQueue () { pthread_mutex_destroy (&_mutex); pthread_cond_destroy (&_proInf); pthread_cond_destroy (&_conInf); } };
你可能想问,queue不是有封装size吗?哪这里我们还定义一个大小变量,会不会有些多余?
nope!实际上,这里的这个_size
就好比我们在C语言写顺序表的时候,成员capacity
;其作用是来判断我们的队列有没有满 的。
4.2 push和pop 对于一个队列,最重要的操作就是在队头出数据,队尾入数据
简单说来,就是需要在处理队列数据的时候进行加锁,保证原子性;
除此以外,生产者和消费者有不同的操作逻辑:
生产判断是否符合生产条件(队列没有满) 满,不生产;不满,生产; 满了的时候,生产者应在条件变量中等待(等待消费者消费) 不满的时候,生产者生产,并通知消费者来消费 消费判断是否满足消费条件(队列不为空) 空,不消费;不空,消费; 空了的时候,消费者应该在条件变量中等待(等待生产者生产) 不空的时候,消费者消费,并通知生产者继续生产 这样就实现了阻塞队列push和pop的基本逻辑;由此可以写出下面的代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 T pop () { pthread_mutex_lock (&_mutex); if (_size) { pthread_cond_wait (&_conInf,&_mutex); } T tmp = _bq.front (); _bq.pop (); pthread_cond_signal (&_proInf); pthread_mutex_unlock (&_mutex); return tmp; } void Push (const T& in) { pthread_mutex_lock (&_mutex); if (bq.size ()>=_size) { pthread_cond_wait (&_proInf,&_mutex); } _bq.push (in); pthread_cond_signal (&_conInf); pthread_mutex_unlock (&_mutex); }
4.2.1 运行测试 有了这个基本框架,我们就可以来测试一下代码啦!
先来一个生产者和消费者康康吧
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 #include "blockqueue.hpp" #include <time.h> #include <stdlib.h> void *consume (void *args) { BlockQueue<int > *bqp = (BlockQueue<int > *)args; while (1 ) { int ret = bqp->pop (); cout << "consume " << pthread_self () << " 消费:" << ret << endl; sleep (1 ); } } void *produce (void *args) { BlockQueue<int > *bqp = (BlockQueue<int > *)args; while (1 ) { cout << "########↓" << endl; int a = rand ()%100 ; bqp->push (a); cout << "produce " << pthread_self () << " 生产:" << a << endl; sleep (2 ); } } int main () { srand ((unsigned int )time (nullptr )); pthread_t t1,t2; BlockQueue<int > bq (5 ) ; pthread_create (&t1,nullptr ,produce,(void *)&bq); pthread_create (&t2,nullptr ,consume,(void *)&bq); pthread_join (t1,nullptr ); pthread_join (t2,nullptr ); return 0 ; }
可以看到,刚开始消费者并没有运行,而是等待生产者生产出数据了之后,再开始消费!我们的目的成功达成!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 [muxue@bt-7274:~/git/linux/code/23-01-08 blockqueue]$ ./test ########↓ produce 139716365768448 生产:41 ########↓ produce 139716365768448 生产:11 consume 139716357375744 消费:41 ########↓ produce 139716365768448 生产:19 consume 139716357375744 消费:11 ########↓ produce 139716365768448 生产:19 consume 139716357375744 消费:19 ########↓ produce 139716365768448 生产:25 consume 139716357375744 消费:19 ########↓ produce 139716365768448 生产:63 consume 139716357375744 消费:25
如果增加线程到2生产2消费,则会看到下面的情况
1 2 3 4 5 6 7 8 9 10 [muxue@bt-7274:~/git/linux/code/23-01-08 blockqueue]$ ./test ########↓ produce 140483816179456 生产:51 consume 140483807786752 消费:51 ########↓ produce 140483799394048 生产:38 consume 140483791001344 消费:38 ########↓ produce 140483816179456 生产:37 consume 140483807786752 消费:37
每次被唤醒的生产者和消费者都是不一样的,交替唤醒
4.2.2 进一步封装 为了代码的可读性,我们可以对阻塞队列进一步封装
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 #include <iostream> #include <queue> #include <pthread.h> #include <unistd.h> using namespace std;template <class T >class BlockQueue { private : queue<T> _bq; size_t _size; pthread_mutex_t _mutex; pthread_cond_t _proInf; pthread_cond_t _conInf; public : BlockQueue (int sz=5 ) :_size(sz) { pthread_mutex_init (&_mutex,nullptr ); pthread_cond_init (&_proInf,nullptr ); pthread_cond_init (&_conInf,nullptr ); } ~BlockQueue () { pthread_mutex_destroy (&_mutex); pthread_cond_destroy (&_proInf); pthread_cond_destroy (&_conInf); } T pop () { lock (); if (isEmpty ()) { ConWait (); } T tmp = _bq.front (); _bq.pop (); WakeUpPro (); unlock (); return tmp; } void push (const T& in) { lock (); if (isFull ()) { ProWait (); } _bq.push (in); WakeUpCon (); unlock (); } private : void lock () { pthread_mutex_lock (&_mutex); } void unlock () { pthread_mutex_unlock (&_mutex); } void WakeUpCon () { pthread_cond_signal (&_conInf); } void WakeUpPro () { pthread_cond_signal (&_proInf); } void ConWait () { pthread_cond_wait (&_conInf,&_mutex); } void ProWait () { pthread_cond_wait (&_proInf,&_mutex); } bool isFull () { return _size == _bq.size (); } bool isEmpty () { return _bq.empty (); } };
4.2.3 使用task分配运算任务 因为阻塞队列是用模板类型的,我们可以自己实现一个仿函数
,来给生产者消费者分配任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 #pragma once #include <iostream> using namespace std;class Task { public : Task (int one=0 , int two=0 , char op="+" ) : _elem1(one), _elem2(two), _operator(op) {} int operator () () { return run (); } int run () { int result = 0 ; switch (_operator) { case '+' : result = _elem1 + _elem2; break ; case '-' : result = _elem1 - _elem2; break ; case '*' : result = _elem1 * _elem2; break ; case '/' : { if (_elem2 == 0 ) { cout << "div zero, abort" << endl; result = -1 ; } else { result = _elem1 / _elem2; } break ; } case '%' : { if (_elem2 == 0 ) { cout << "mod zero, abort" << endl; result = -1 ; } else { result = _elem1 % _elem2; } break ; } default : cout << "unknown: " << _operator << endl; break ; } return result; } void get (int *e1, int *e2, char *op) { *e1 = _elem1; *e2 = _elem2; *op = _operator; } private : int _elem1; int _elem2; char _operator; };
测试一下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 #include "blockqueue.hpp" #include "task.hpp" #include <time.h> #include <stdlib.h> #include <sys/types.h> #include <unistd.h> #include <string> string ops ="+-*/%" ; void *consumer (void *args) { BlockQueue<Task> *bqp = (BlockQueue<Task> *)args; while (1 ) { Task t = bqp->pop (); int result = t (); int one, two; char op; t.get (&one, &two, &op); cout << "consumer [" << pthread_self () << "] " << (unsigned long )time (nullptr ) << " 消费了一个任务: " << one << op << two << " = " << result << endl; } } void *producer (void *args) { BlockQueue<Task> *bqp = (BlockQueue<Task> *)args; while (1 ) { int one = rand () % 50 ; int two = rand () % 20 ; char op = ops[rand () % ops.size ()]; Task t (one, two, op) ; bqp->push (t); cout << "producter[" << pthread_self () << "] " << (unsigned long )time (nullptr ) << " 生产了一个任务: " << one << op << two << " = ?" << endl; sleep (1 ); } } void test2 () { pthread_t t1,t2; BlockQueue<Task> bq (5 ) ; pthread_create (&t1,nullptr ,producer,(void *)&bq); sleep (1 ); pthread_create (&t2,nullptr ,consumer,(void *)&bq); pthread_join (t1,nullptr ); pthread_join (t2,nullptr ); } int main () { srand ((unsigned long )time (nullptr )); test2 (); return 0 ; }
运行,可以看到生产者生产了问题之后,消费者会去解答。此时我们只需要在线程中取回运算好的结果,就OK了!
1 2 3 4 5 6 7 8 9 10 11 [muxue@bt-7274:~/git/linux/code/23-01-08 blockqueue]$ ./test producter[140703422265088] 1673319321 生产了一个任务: 43-16 = ? consumer [140703413872384] 1673319322 消费了一个任务: 43-16 = 27 producter[140703422265088] 1673319322 生产了一个任务: 45/12 = ? consumer [140703413872384] 1673319322 消费了一个任务: 45/12 = 3 producter[140703422265088] 1673319323 生产了一个任务: 17/7 = ? consumer [140703413872384] 1673319323 消费了一个任务: 17/7 = 2 producter[140703422265088] 1673319324 生产了一个任务: 49-14 = ? consumer [140703413872384] 1673319324 消费了一个任务: 49-14 = 35 producter[140703422265088] 1673319325 生产了一个任务: 4%4 = ? consumer [140703413872384] 1673319325 消费了一个任务: 4%4 = 0
5.阻塞队列-循环队列 5.0 为啥用循环队列 上面的队列是封装了queue,下面我们要利用环形队列的方式来实现一个功能相同的环形队列。
所谓循环队列,就是在一个一维数组中,通过头尾两个指针,来标识队列的头尾。如果数据数量超出空间末尾,则在空间的开头放入,并移动尾指针
这部分可以看我的博客 循环队列
用上循环队列,就有一个显著的优势:因为我们访问的(假设是数组)是不同下标位置,其并非同一块内存空间 ,所以是可以同时访问的!这样就进一步显现了生产消费的并发属性
这就相当于把循环队列这个临界资源分成了一小块一小块 ;只有满/空的时候,头尾指针会指向同一块空间,其余时间都是不冲突的!
注意 :这需要程序猿来保证 ,获取了信号量之后,访问的肯定是临界资源中的不同区域,否则是会出问题的!
5.1 POSIX信号量 这里又要重新认识一下信号量了,在先前的博客 中,简单提到了信号量的概念,其本质上是一个计数器 。
对信号量的操作是原子的,不会因为线程切换而发生错误和冲突。
循环队列需要有两个标识符,来标识当前数据的头尾。注意,信号量可不能做下标 使用,这里我们可以用int
类型,加锁来解决原子性问题;
信号量在环形队列中的作用,还是用于标识 空间剩余/数据数量
因为这是一个阻塞队列:
生产者放入数据,对应的是空间信号量,只有有空间的时候,才能往环形队列里面放入 消费者取出数据,对应的是数据信号量,没有数据也就不能取了 此时,先前介绍的semop
函数在此环节不太适合,在此介绍两个来自pthread
库的新接口;这些接口在编译的时候都需要带上-lpthread
5.1.1 sem_init/destroy 1 2 3 4 #include <semaphore.h> int sem_init (sem_t *sem, int pshared, unsigned int value) ;int sem_destroy (sem_t *sem) ;
因为都是pthread库的,其使用方法还是很相似的,我们需要对信号量进行初始化,并给定一个value作为信号量的初始值;
对init
函数的pshared
参数做一定讲解,在man手册中的介绍是这样的
如果我们的信号量需要在线程中共享,那就将该参数设置为0;
如果信号量需要在进程中共享,其就应该处在共享内存区域,可以对添加了共享内存的进程之间共享;
阻塞队列是给线程使用的,所以我们设置成0就可以了。
5.1.2 sem_wait 这个接口和锁/条件变量的wait是一样的,其作用是申请一个信号量,如果信号量为0,则在此处等待,直到信号量非0
1 2 3 4 5 6 7 8 9 10 NAME sem_wait, sem_timedwait, sem_trywait - lock a semaphore SYNOPSIS #include <semaphore.h> int sem_wait(sem_t *sem); int sem_trywait(sem_t *sem); int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout); Link with -pthread.
可以简单理解为,这个接口的作用就是让信号量-1
trywait
则是非阻塞检测;timewait
是设置等待的时间,超时就不等了。这点和条件变量的接口是一样的
5.1.3 sem_post 这个接口的作用就是释放一个信号量,相当于给这个信号量+1
1 2 3 4 5 6 NAME sem_post - unlock a semaphore SYNOPSIS #include <semaphore.h> int sem_post(sem_t *sem);
5.1.4 在循环队列里面使用 知道了这几个接口,对于我们环形队列的使用就比较明了了
当生产者开始生产的时候,申请一个空间信号量(剩余空间-1)生产完毕后,释放一个数量信号量(剩余数量+1) 当消费者开始消费的时候,申请一个数量信号量(剩余数量-1)生产完毕后,释放一个空间信号量(剩余空间+1) 除了这两个信号量,生产消费的时候,还需要操作头尾指针,指向队列正确的位置
5.2 成员变量 前面铺垫了那么多,现在就可以来试试水了!
1 2 3 4 5 6 7 8 vector<T> _rq; sem_t _spaceSem;sem_t _valueSem;pthread_mutex_t _proMutex;pthread_mutex_t _conMutex;size_t _rear;size_t _front;
因为信号量就可以充当条件变量的角色,所以这里就不需要条件变量来通知生产者/消费者了
5.3 构造/析构 接下来要做的,就是写构造/析构了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 RingQueue (int capa = 5 ) :_rq(capa), _rear(0 ), _front(0 ) { sem_init (&_spaceSem,0 ,capa); sem_init (&_valueSem,0 ,0 ); pthread_mutex_init (&_proMutex,nullptr ); pthread_mutex_init (&_conMutex,nullptr ); } ~RingQueue () { sem_destroy (&_spaceSem); sem_destroy (&_valueSem); pthread_mutex_destroy (&_proMutex); pthread_mutex_destroy (&_conMutex); }
5.4 生产消费 首先我们需要知道,当队列满/空的时候,分别对应啥情况
满:生产太多了,生产者得休息 空:消费太多了,消费者得缓缓 因为有信号量帮我们做了访问控制 ,所以我们不需要判断 循环队列什么时候为满,什么时候为空:
满的时候:数量=队列容量,空间信号量=0,无法申请空间,无法生产 此时生产者会在空间信号量 里面等待,不会继续生产;消费者继续消费 空的时候:空间=队列容量,数量信号量=0,没有可以消费的 此时消费者会在数量信号量 里面等待,不会继续消费;生产者继续生产 这也是信号量作为访问控制的一大特征,当你申请成功了,就代表你肯定有临界资源的访问权限了 ;再加上我们给访问临界区加了锁,自然也不会出现被其他线程抢了的情况🎉
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 void Push (T& in) { sem_wait (&_spaceSem); pthread_mutex_lock (&_proMutex); _rq[_rear] = in; _rear++; _rear %= _rq.size (); pthread_mutex_unlock (&_proMutex); sem_post (&_valueSem); } T pop () { sem_wait (&_valueSem); pthread_mutex_lock (&_conMutex); T tmp = _rq[_front]; _front++; _front %= _rq.size (); pthread_mutex_unlock (&_conMutex); sem_post (&_spaceSem); return tmp; }
5.5 测试 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 string ops ="+-*/%" ; void *consumer (void *args) { RingQueue<Task> *bqp = (RingQueue<Task> *)args; while (1 ) { Task t = bqp->pop (); int result = t (); int one, two; char op; t.get (&one, &two, &op); cout << "consumer [" << pthread_self () << "] " << (unsigned long )time (nullptr ) << " 消费了一个任务: " << one << op << two << " = " << result << endl; } } void *producer (void *args) { RingQueue<Task> *bqp = (RingQueue<Task> *)args; while (1 ) { int one = rand () % 50 ; int two = rand () % 20 ; char op = ops[rand () % ops.size ()]; Task t (one, two, op) ; bqp->push (t); cout << "producter[" << pthread_self () << "] " << (unsigned long )time (nullptr ) << " 生产了一个任务: " << one << op << two << " = ?" << endl; sleep (1 ); } } void test2 () { pthread_t t1,t2; RingQueue<Task> bq (5 ) ; pthread_create (&t1,nullptr ,producer,(void *)&bq); sleep (1 ); pthread_create (&t2,nullptr ,consumer,(void *)&bq); pthread_join (t1,nullptr ); pthread_join (t2,nullptr ); } int main () { srand ((unsigned long )time (nullptr )); test2 (); return 0 ; }
刚开始运行的时候,发现了这个错误
这是因为在task中,我们重写了构造函数,编译器就不会生成默认构造了
1 2 3 Task(int one, int two, char op) : _elem1(one), _elem2(two), _operator(op) {}
而创建环形队列的时候,会自动调用构造函数 。此时发现没有匹配的构造函数(无参),就会报错!
解决办法是,添加上一个无参构造,直接使用default
关键字即可
运行,可以看到生产消费稳定跑起来了!
1 2 3 4 5 6 7 8 9 [muxue@bt-7274:~/git/linux/code/23-01-17 ringqueue]$ ./test producter[139758380586752] 1673936332 生产了一个任务: 24-8 = ? producter[139758380586752] 1673936333 生产了一个任务: 34/19 = ? consumer [139758372194048] 1673936333 消费了一个任务: 24-8 = 16 consumer [139758372194048] 1673936333 消费了一个任务: 34/19 = 1 producter[139758380586752] 1673936334 生产了一个任务: 18*16 = ? consumer [139758372194048] 1673936334 消费了一个任务: 18*16 = 288 producter[139758380586752] 1673936335 生产了一个任务: 48+15 = ? consumer [139758372194048] 1673936335 消费了一个任务: 48+15 = 63
增多线程,也能正常运行!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 [muxue@bt-7274:~/git/linux/code/23-01-17 ringqueue]$ ./test producter[140344921630464] 1673936659 生产了一个任务: 44-15 = ? consumer [140344913237760] 1673936660 消费了一个任务: 44-15 = 29 producter[140344921630464] 1673936660 生产了一个任务: 33+14 = ? consumer [140344913237760] 1673936660 消费了一个任务: 33+14 = 47 producter[140344904845056] 1673936661 生产了一个任务: 14/12 = ? consumer [140344896452352] 1673936661 消费了一个任务: 14/12 = 1 producter[140344921630464] 1673936661 生产了一个任务: 24-16 = ? consumer [140344913237760] 1673936661 消费了一个任务: 24-16 = 8 producter[140344904845056] 1673936662 生产了一个任务: 3-3 = ? consumer [140344896452352] 1673936662 消费了一个任务: 3-3 = 0 producter[140344921630464] 1673936662 生产了一个任务: 36*7 = ? consumer [140344913237760] 1673936662 消费了一个任务: 36*7 = 252 producter[140344904845056] 1673936663 生产了一个任务: 28+8 = ? consumer [140344896452352] 1673936663 消费了一个任务: 28+8 = 36 producter[140344921630464] 1673936663 生产了一个任务: 26-5 = ? consumer [140344913237760] 1673936663 消费了一个任务: 26-5 = 21
结语 关于线程同步的知识点,大概就是这些了。博客写的满满当当,在理解了接口的基本命名和使用逻辑后,感觉就没有那么难了
加油哦!