【Linux】线程概念 | 同步
慕雪年华

除了线程互斥,我们还有线程同步,来康康吧

[TOC]

1.为什么需要同步

在部分条件下,互斥是正确的,但是不合理。比如食堂打饭的时候,食堂阿姨一次只能给一个人打饭,于是就选择通过竞争来获得打饭的权利。此时就会出现有些线程因为优先级过低或者CPU调度问题,一直打不到饭,于是就出现了饥饿问题

这是因为我们对多线程访问同一个资源没有进行限制,全靠CPU调度来决定运行顺序;所以我们需要对线程进行一定的控制,这就是线程同步的概念

  • 饥饿问题:某一个线程一直无法申请到某种资源(比如因为优先级过低)
  • 同步:保证数据安全(临界资源访问)的前提下,让线程根据一定条件和顺序访问临界资源,从而避免饥饿问题
  • 竞态条件:因为时序问题(CPU调度)而导致程序异常;

2.生产消费模型

这个模型其实很简单,消费者去超市购买东西,生产者把商品投放到超市中。

  • 这时候就不需要消费者直接去找工厂问他xx东西又没有生产,他需要购买;而是转去超市里面购买xx东西;
  • 如果xx东西没有货了,超市就会通知生产者进行补货。如果超市里面的货架已经满了,就通知生产者不需要继续生产了;
  • 当商品没货了,超市会告知消费者这个东西没货,消费者会停止消费行为;而生产者补货了之后,超市就会通知消费者让他来购买

image

在基础模式中,消费者要想知道一个东西有没有货,需要去超市里面看(相当于轮循检测)

我们可以引入一个通知方式,比如超市开放一个微信公众号,告知消费者xx物品是否有货,以及告知消费者什么时候需要补货,此时就不需要消费者和生产者不断询问超市关于一个商品的情况了!这就相当于线程同步

2.1 生产者和消费者的关系

下面提到的是普适情况

  • 消费者有多个,他们之间是竞争关系(互斥)竞争商品的购买
  • 生产者有多个,他们之间是竞争关系(互斥)竞争超市的货架
  • 消费者和生产者之间,既有互斥关系,也是同步关系(需要生产者供货了之后,消费者才能消费)这两个关系并不冲突!

除了上面提到的3种生产关系,还有下面俩点

在实际程序中,消费者和生产者都是由线程承担的(2种角色

超市是内存中特定的一个数据结构,也是临界资源(1个交易场所

我们可以用321原则来快速记住这几条,这样就记住了生产消费模型!👍

2.2 以简单代码为例

在旧模式中,main函数调用另外一个函数,想获得返回值,需要等这个函数运行完毕;好比消费者购买东西,需要去找厂家并等待厂家生产……

image

而在生产消费者模型中,main作为主线程,只需要把待处理的数据丢进缓冲区;而线程B从缓冲区中取出数据,处理完毕后放回缓冲区。main可以先执行其他代码,过一会再过来拿线程B处理好的结果。

这就实现了生产和消费的解耦

image

2.3 并发

生产消费模型的并发,更多的体现在消费者在处理任务的同时,生产者可以生产任务;

线程切换的成本低于进程,由此便提高了数据处理的效率


接下来我们就要解决下面这些问题😁

1
2
3
1.如何让多个消费者线程等待呢?又如何让消费者线程被唤醒呢?
2.如何让多个生产者线程等待呢?又如何让生产者线程被唤醒呢?
3.如何衡量消费者和生产者所关心的条件是否就绪呢?

而前面提到的通知方式,在linux系统中,就是条件变量了!

3.条件变量接口

3.1 init/destroy

基本的接口和pthread库的其他接口很相似,都是一样的用法;其中attr也是设置条件变量的属性,这里置为nullptr即可

1
2
3
4
5
6
#include <pthread.h>

int pthread_cond_destroy(pthread_cond_t *cond);
int pthread_cond_init(pthread_cond_t *restrict cond,
const pthread_condattr_t *restrict attr);
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

3.2 pthread_cond_wait

这两个接口都是让线程在一个条件变量下进行等待,其中timewait接口可以设置等待的时间(超时了就不等了)

条件变量也是临界资源,所以这里需要一个mutex锁来保证条件变量读写的原子性

1
2
3
4
5
6
7
#include <pthread.h>

int pthread_cond_timedwait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex,
const struct timespec *restrict abstime);
int pthread_cond_wait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex);

3.3 pthrea_cond_signal/broadcast

这个接口的作用是给在条件变量下等待的线程发信号

1
2
3
4
#include <pthread.h>

int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);

其中broadcast是给在当前条件变量等待的所有线程发信号,而signal是发送信号,只唤醒一个线程;

如果调用成功,这两个函数都会返回0;否则返回错误码

1
2
3
RETURN VALUE
If successful, the pthread_cond_broadcast() and pthread_cond_signal() functions shall return zero; otherwise, an error number shall
be returned to indicate the error.

3.4 代码示例

下面这个代码可以很好的演示上面提到的多个接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
#include<iostream>
#include<string.h>
#include<signal.h>
#include<pthread.h>
#include<thread>
#include<unistd.h>
#include<sys/types.h>
#include<sys/syscall.h>
using namespace std;

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;//锁
pthread_cond_t cond;//条件变量

volatile bool quit = false;

void*func(void* arg)
{
while(!quit)//这里有bug,后续会提到
{
pthread_cond_wait(&cond,&mutex);
cout << "thread is running... " << (char*)arg << endl;
}
cout << "thread quit... " << (char*)arg << endl;
}

int main()
{
pthread_cond_init(&cond,nullptr);
pthread_t t1,t2,t3;
pthread_create(&t1,nullptr,func,(void*)"t1");
pthread_create(&t2,nullptr,func,(void*)"t2");
pthread_create(&t3,nullptr,func,(void*)"t3");

char c;
while(1)
{
cout << "[a/b]$ ";
cin >> c;
if(c=='a')
{
pthread_cond_signal(&cond);
}
else if(c=='b')
{
pthread_cond_broadcast(&cond);
}
else
{
quit = true;
break;
}
usleep(500);
}

cout << "main break: " << quit << endl;
sleep(1);
pthread_cond_broadcast(&cond);

pthread_join(t1,nullptr);
pthread_join(t2,nullptr);
pthread_join(t3,nullptr);

return 0;
}

每次输入a,就signal会唤醒一个线程;每次输入b,会调用broadcast,唤醒当前所有线程

1
2
3
4
5
6
7
8
9
10
11
[muxue@bt-7274:~/git/linux/code/22-12-25_线程同步]$ ./test
[a/b]$ a
thread is running... t1
[a/b]$ a
thread is running... t2
[a/b]$ a
thread is running... t3
[a/b]$ b
thread is running... t1
thread is running... t2
thread is running... t3

3.4.1 小bug

上面的代码示例,会出现下面的问题,即我们输入除了a和b以外的所有字符,都应该会把全局变量quit改成true,让三个线程都退出

但观察到的现象却是只有一个线程退出了,其他线程阻塞等待了

1
2
3
4
5
6
[muxue@bt-7274:~/git/linux/code/22-12-25_线程同步]$ ./test
[a/b]$ q
main break: 1
thread is running... t3
thread quit... t3

这是因为pthread_cond_wait里面进行了独特的操作,即等待之前,它会释放锁,等待之后,他会重新申请锁

1
2
3
4
5
6
7
8
9
10
int pthread_cond_wait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex)
{
pthread_mutex_unlock(mutex);//先解锁
//避免因为该线程拿着锁去休眠了,导致其他线程无法申请该锁

//条件变量相关代码

pthread_mutex_lock(mutex);//条件满足后,再加锁
}

第一个退出的线程,退出之前申请了锁却没有释放,于是就导致其他线程在条件满足后,没有办法申请锁,只能阻塞等待!

3.4.2 修正

修正的方法很简单,我们只需要在while(!quit)循环的退出条件满足之后,释放一下锁,就OK了!

1
2
3
4
5
6
7
8
9
10
11
void*func(void* arg)
{
while(!quit)
{
pthread_cond_wait(&cond,&mutex);
cout << "thread is running... " << (char*)arg << endl;

}
pthread_mutex_unlock(&mutex);//正确操作:需要在条件满足后,解锁
cout << "thread quit... " << (char*)arg << endl;
}

此时就能看到,所有线程都正常退出了!

1
2
3
4
5
6
7
8
9
10
[muxue@bt-7274:~/git/linux/code/22-12-25_线程同步]$ ./test
[a/b]$ q
main break: 1
thread is running... t1
thread quit... t1
thread is running... t2
thread quit... t2
thread is running... t3
thread quit... t3
[muxue@bt-7274:~/git/linux/code/22-12-25_线程同步]$

3.4.3 典型错误

根据这点,就能引出一个比较典型的错误

1
2
3
4
5
6
7
pthread_mutex_lock(&mutex);
while (condition_is_false) {
pthread_mutex_unlock(&mutex);
pthread_cond_wait(&cond);//解锁和加锁的操作,该函数会帮我们完成
pthread_mutex_lock(&mutex);//二次申请同一把锁,出现死锁!
}
pthread_mutex_unlock(&mutex);

两次申请同一把锁,就好比自己把自己绊倒了😂我们要避免写出这样的错误代码!


4.阻塞队列-生产消费模型实例

这个队列的作用,就是提供一个超市,供生产者和消费者进行数据的交换

  • 生产者,往队列里面push数据
  • 消费者,从队列里面pop数据

看起来有些类似于管道,同样的,生产者和消费者在读取阻塞队列的时候,不仅需要保证自己的操作是原子操作,还需要做到一定的访问控制;即消费者在队列空的时候不能继续pop,生产者在队列满的时候不能继续push

此时,我们还可以引入一个微信公众号,也就是一定的通知方式:不要让生产者、消费者疯狂检测阻塞队列,而是引入条件变量,在队列不为空的时候,通知消费者;在队列不为满的时候,通知生产者;这样就达到了线程之间的同步。

4.1 成员变量

要实现阻塞队列,我们首先需要理清楚需要什么成员变量,来保护该队列

  • 用于访问控制的锁,同一时刻只能有一个线程访问队列
  • 用户线程同步的条件变量,因为我们需要在不同的条件下通知不同的人,所以需要2个条件变量
  • 一个队列,为了方便,采用std::queue,这样就不用自己造轮子了

理清楚了之后,就可以来写成员变量啦;我采用了模板类型,这样阻塞队列就可以用来存放任何我们想要的类型了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
template<class T>
class BlockQueue
{
private:
queue<T> _bq;//队列
size_t _size;//大小
pthread_mutex_t _mutex;//锁
pthread_cond_t _proInf;//通知生产者
pthread_cond_t _conInf;//通知消费者
public:
BlockQueue(int sz=5)
:_size(sz)
{
pthread_mutex_init(&_mutex,nullptr);
pthread_cond_init(&_proInf,nullptr);
pthread_cond_init(&_conInf,nullptr);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_proInf);
pthread_cond_destroy(&_conInf);
}
};

你可能想问,queue不是有封装size吗?哪这里我们还定义一个大小变量,会不会有些多余?

nope!实际上,这里的这个_size 就好比我们在C语言写顺序表的时候,成员capacity;其作用是来判断我们的队列有没有满的。


4.2 push和pop

对于一个队列,最重要的操作就是在队头出数据,队尾入数据

简单说来,就是需要在处理队列数据的时候进行加锁,保证原子性;

除此以外,生产者和消费者有不同的操作逻辑:

  • 生产
    • 判断是否符合生产条件(队列没有满)
    • 满,不生产;不满,生产;
    • 满了的时候,生产者应在条件变量中等待(等待消费者消费)
    • 不满的时候,生产者生产,并通知消费者来消费
  • 消费
    • 判断是否满足消费条件(队列不为空)
    • 空,不消费;不空,消费;
    • 空了的时候,消费者应该在条件变量中等待(等待生产者生产)
    • 不空的时候,消费者消费,并通知生产者继续生产

这样就实现了阻塞队列push和pop的基本逻辑;由此可以写出下面的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
//消费者消费
T pop()
{
//加锁
pthread_mutex_lock(&_mutex);
//判断条件
if(_size)//空,不消费
{
pthread_cond_wait(&_conInf,&_mutex);
}

//消费并通知生产者
T tmp = _bq.front();
_bq.pop();
pthread_cond_signal(&_proInf);

//解锁
pthread_mutex_unlock(&_mutex);
return tmp;
}
//生产者生产
void Push(const T& in)
{
//加锁
pthread_mutex_lock(&_mutex);
//判断条件
if(bq.size()>=_size)//满,不生产
{
pthread_cond_wait(&_proInf,&_mutex);
}

//生产并通知消费者
_bq.push(in);
pthread_cond_signal(&_conInf);

//解锁
pthread_mutex_unlock(&_mutex);
}

4.2.1 运行测试

有了这个基本框架,我们就可以来测试一下代码啦!

先来一个生产者和消费者康康吧

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#include "blockqueue.hpp"
#include <time.h>
#include <stdlib.h>

void *consume(void *args)
{
BlockQueue<int> *bqp = (BlockQueue<int> *)args;
while(1)
{
// 消费
int ret = bqp->pop();
cout << "consume " << pthread_self() << " 消费:" << ret << endl;
sleep(1);
}
}
void *produce(void *args)
{
BlockQueue<int> *bqp = (BlockQueue<int> *)args;
while (1)
{
// 制作
cout << "########↓" << endl;
int a = rand()%100;
// 投放到超市
bqp->push(a);
cout << "produce " << pthread_self() << " 生产:" << a << endl;
sleep(2);
}
}

int main()
{
srand((unsigned int)time(nullptr));
pthread_t t1,t2;
BlockQueue<int> bq(5);
pthread_create(&t1,nullptr,produce,(void*)&bq);
pthread_create(&t2,nullptr,consume,(void*)&bq);

pthread_join(t1,nullptr);
pthread_join(t2,nullptr);

return 0;
}

可以看到,刚开始消费者并没有运行,而是等待生产者生产出数据了之后,再开始消费!我们的目的成功达成!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
[muxue@bt-7274:~/git/linux/code/23-01-08 blockqueue]$ ./test
########↓
produce 139716365768448 生产:41
########↓
produce 139716365768448 生产:11
consume 139716357375744 消费:41
########↓
produce 139716365768448 生产:19
consume 139716357375744 消费:11
########↓
produce 139716365768448 生产:19
consume 139716357375744 消费:19
########↓
produce 139716365768448 生产:25
consume 139716357375744 消费:19
########↓
produce 139716365768448 生产:63
consume 139716357375744 消费:25

如果增加线程到2生产2消费,则会看到下面的情况

1
2
3
4
5
6
7
8
9
10
[muxue@bt-7274:~/git/linux/code/23-01-08 blockqueue]$ ./test
########↓
produce 140483816179456 生产:51
consume 140483807786752 消费:51
########↓
produce 140483799394048 生产:38
consume 140483791001344 消费:38
########↓
produce 140483816179456 生产:37
consume 140483807786752 消费:37

每次被唤醒的生产者和消费者都是不一样的,交替唤醒

4.2.2 进一步封装

为了代码的可读性,我们可以对阻塞队列进一步封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
#include<iostream>
#include<queue>
#include<pthread.h>
#include<unistd.h>
using namespace std;

template<class T>
class BlockQueue
{
private:
queue<T> _bq;//队列
size_t _size;//大小
pthread_mutex_t _mutex;//锁
pthread_cond_t _proInf;//通知生产者
pthread_cond_t _conInf;//通知消费者
public:
BlockQueue(int sz=5)
:_size(sz)
{
pthread_mutex_init(&_mutex,nullptr);
pthread_cond_init(&_proInf,nullptr);
pthread_cond_init(&_conInf,nullptr);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_proInf);
pthread_cond_destroy(&_conInf);
}
//消费者消费
T pop()
{
//加锁
lock();
//判断条件
if(isEmpty())//空,不消费
{
ConWait();
}

//消费并通知生产者
T tmp = _bq.front();
_bq.pop();
WakeUpPro();

//解锁
unlock();
return tmp;
}
//生产者生产
void push(const T& in)
{
//加锁
lock();
//判断条件
if(isFull())//满,不生产
{
ProWait();
}

//生产并通知消费者
_bq.push(in);
WakeUpCon();

//解锁
unlock();
}
private:
void lock()
{
pthread_mutex_lock(&_mutex);
}
void unlock()
{
pthread_mutex_unlock(&_mutex);
}
//唤醒消费者
void WakeUpCon()
{
pthread_cond_signal(&_conInf);
}
//唤醒生产者
void WakeUpPro()
{
pthread_cond_signal(&_proInf);
}
//消费者等待
void ConWait()
{
pthread_cond_wait(&_conInf,&_mutex);
}
//生产者等待
void ProWait()
{
pthread_cond_wait(&_proInf,&_mutex);
}

//判断条件
bool isFull()
{
return _size == _bq.size();
}
bool isEmpty()
{
return _bq.empty();
}
};

4.2.3 使用task分配运算任务

因为阻塞队列是用模板类型的,我们可以自己实现一个仿函数,来给生产者消费者分配任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
#pragma once

#include <iostream>
using namespace std;

class Task
{
public:
Task(int one=0, int two=0, char op="+")
: _elem1(one), _elem2(two), _operator(op)
{}
// 仿函数
int operator() ()
{
return run();
}
// 运行仿函数
int run()
{
int result = 0;
switch (_operator)
{
case '+':
result = _elem1 + _elem2;
break;
case '-':
result = _elem1 - _elem2;
break;
case '*':
result = _elem1 * _elem2;
break;
case '/':
{
if (_elem2 == 0)
{
cout << "div zero, abort" << endl;
result = -1;
}
else
{
result = _elem1 / _elem2;
}
break;
}
case '%':
{
if (_elem2 == 0)
{
cout << "mod zero, abort" << endl;
result = -1;
}
else
{
result = _elem1 % _elem2;
}
break;
}
default:
cout << "unknown: " << _operator << endl;
break;
}
return result;
}

// 获取元素,方便打印
void get(int *e1, int *e2, char *op)
{
*e1 = _elem1;
*e2 = _elem2;
*op = _operator;
}
private:
int _elem1;
int _elem2;
char _operator;
};

测试一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#include "blockqueue.hpp"
#include "task.hpp"
#include <time.h>
#include <stdlib.h>
#include <sys/types.h>
#include <unistd.h>
#include <string>
string ops ="+-*/%";

void *consumer(void *args)
{
BlockQueue<Task> *bqp = (BlockQueue<Task> *)args;
while (1)
{
Task t = bqp->pop();
int result = t(); //处理任务
int one, two;
char op;
t.get(&one, &two, &op);
cout << "consumer [" << pthread_self() << "] " << (unsigned long)time(nullptr) << " 消费了一个任务: " << one << op << two << " = " << result << endl;
}
}
void *producer(void *args)
{
BlockQueue<Task> *bqp = (BlockQueue<Task> *)args;
while (1)
{
// 制作任务
int one = rand() % 50;
int two = rand() % 20;
char op = ops[rand() % ops.size()];
Task t(one, two, op);
// 投放给消费者生产
bqp->push(t);
cout << "producter[" << pthread_self() << "] " << (unsigned long)time(nullptr) << " 生产了一个任务: " << one << op << two << " = ?" << endl;
sleep(1);
}
}

void test2()
{
pthread_t t1,t2;
BlockQueue<Task> bq(5);
pthread_create(&t1,nullptr,producer,(void*)&bq);
sleep(1);
pthread_create(&t2,nullptr,consumer,(void*)&bq);

pthread_join(t1,nullptr);
pthread_join(t2,nullptr);
}

int main()
{
srand((unsigned long)time(nullptr));//乘一个数字添加随机性
test2();

return 0;
}

运行,可以看到生产者生产了问题之后,消费者会去解答。此时我们只需要在线程中取回运算好的结果,就OK了!

1
2
3
4
5
6
7
8
9
10
11
[muxue@bt-7274:~/git/linux/code/23-01-08 blockqueue]$ ./test
producter[140703422265088] 1673319321 生产了一个任务: 43-16 = ?
consumer [140703413872384] 1673319322 消费了一个任务: 43-16 = 27
producter[140703422265088] 1673319322 生产了一个任务: 45/12 = ?
consumer [140703413872384] 1673319322 消费了一个任务: 45/12 = 3
producter[140703422265088] 1673319323 生产了一个任务: 17/7 = ?
consumer [140703413872384] 1673319323 消费了一个任务: 17/7 = 2
producter[140703422265088] 1673319324 生产了一个任务: 49-14 = ?
consumer [140703413872384] 1673319324 消费了一个任务: 49-14 = 35
producter[140703422265088] 1673319325 生产了一个任务: 4%4 = ?
consumer [140703413872384] 1673319325 消费了一个任务: 4%4 = 0

5.阻塞队列-循环队列

5.0 为啥用循环队列

上面的队列是封装了queue,下面我们要利用环形队列的方式来实现一个功能相同的环形队列。

所谓循环队列,就是在一个一维数组中,通过头尾两个指针,来标识队列的头尾。如果数据数量超出空间末尾,则在空间的开头放入,并移动尾指针

这部分可以看我的博客 循环队列

用上循环队列,就有一个显著的优势:因为我们访问的(假设是数组)是不同下标位置,其并非同一块内存空间,所以是可以同时访问的!这样就进一步显现了生产消费的并发属性

这就相当于把循环队列这个临界资源分成了一小块一小块;只有满/空的时候,头尾指针会指向同一块空间,其余时间都是不冲突的!

注意:这需要程序猿来保证,获取了信号量之后,访问的肯定是临界资源中的不同区域,否则是会出问题的!

5.1 POSIX信号量

这里又要重新认识一下信号量了,在先前的博客中,简单提到了信号量的概念,其本质上是一个计数器

  • p操作:申请资源
  • v操作:归还资源

对信号量的操作是原子的,不会因为线程切换而发生错误和冲突。

循环队列需要有两个标识符,来标识当前数据的头尾。注意,信号量可不能做下标使用,这里我们可以用int类型,加锁来解决原子性问题;


信号量在环形队列中的作用,还是用于标识 空间剩余/数据数量

因为这是一个阻塞队列:

  • 生产者放入数据,对应的是空间信号量,只有有空间的时候,才能往环形队列里面放入
  • 消费者取出数据,对应的是数据信号量,没有数据也就不能取了

此时,先前介绍的semop函数在此环节不太适合,在此介绍两个来自pthread库的新接口;这些接口在编译的时候都需要带上-lpthread

5.1.1 sem_init/destroy

1
2
3
4
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);

int sem_destroy(sem_t *sem);

因为都是pthread库的,其使用方法还是很相似的,我们需要对信号量进行初始化,并给定一个value作为信号量的初始值;

init函数的pshared参数做一定讲解,在man手册中的介绍是这样的

image

如果我们的信号量需要在线程中共享,那就将该参数设置为0;

如果信号量需要在进程中共享,其就应该处在共享内存区域,可以对添加了共享内存的进程之间共享;

阻塞队列是给线程使用的,所以我们设置成0就可以了。

5.1.2 sem_wait

这个接口和锁/条件变量的wait是一样的,其作用是申请一个信号量,如果信号量为0,则在此处等待,直到信号量非0

1
2
3
4
5
6
7
8
9
10
NAME
sem_wait, sem_timedwait, sem_trywait - lock a semaphore

SYNOPSIS
#include <semaphore.h>
int sem_wait(sem_t *sem);
int sem_trywait(sem_t *sem);
int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);

Link with -pthread.

可以简单理解为,这个接口的作用就是让信号量-1

trywait则是非阻塞检测;timewait是设置等待的时间,超时就不等了。这点和条件变量的接口是一样的

5.1.3 sem_post

这个接口的作用就是释放一个信号量,相当于给这个信号量+1

1
2
3
4
5
6
NAME
sem_post - unlock a semaphore

SYNOPSIS
#include <semaphore.h>
int sem_post(sem_t *sem);

5.1.4 在循环队列里面使用

知道了这几个接口,对于我们环形队列的使用就比较明了了

  • 当生产者开始生产的时候,申请一个空间信号量(剩余空间-1)生产完毕后,释放一个数量信号量(剩余数量+1)
  • 当消费者开始消费的时候,申请一个数量信号量(剩余数量-1)生产完毕后,释放一个空间信号量(剩余空间+1)

除了这两个信号量,生产消费的时候,还需要操作头尾指针,指向队列正确的位置

5.2 成员变量

前面铺垫了那么多,现在就可以来试试水了!

1
2
3
4
5
6
7
8
vector<T> _rq;//队列
sem_t _spaceSem;//空间信号量
sem_t _valueSem;//数据信号量
pthread_mutex_t _proMutex;//生产者的锁
pthread_mutex_t _conMutex;//消费者的锁

size_t _rear;//尾指针
size_t _front;//头指针

因为信号量就可以充当条件变量的角色,所以这里就不需要条件变量来通知生产者/消费者了

5.3 构造/析构

接下来要做的,就是写构造/析构了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
RingQueue(int capa = 5)
:_rq(capa),
_rear(0),
_front(0)
{
sem_init(&_spaceSem,0,capa);//空间
sem_init(&_valueSem,0,0);//数量为0

pthread_mutex_init(&_proMutex,nullptr);
pthread_mutex_init(&_conMutex,nullptr);
}
~RingQueue()
{
sem_destroy(&_spaceSem);
sem_destroy(&_valueSem);

pthread_mutex_destroy(&_proMutex);
pthread_mutex_destroy(&_conMutex);
}

5.4 生产消费

首先我们需要知道,当队列满/空的时候,分别对应啥情况

  • 满:生产太多了,生产者得休息
  • 空:消费太多了,消费者得缓缓

因为有信号量帮我们做了访问控制,所以我们不需要判断循环队列什么时候为满,什么时候为空:

  • 满的时候:数量=队列容量,空间信号量=0,无法申请空间,无法生产
  • 此时生产者会在空间信号量里面等待,不会继续生产;消费者继续消费
  • 空的时候:空间=队列容量,数量信号量=0,没有可以消费的
  • 此时消费者会在数量信号量里面等待,不会继续消费;生产者继续生产

这也是信号量作为访问控制的一大特征,当你申请成功了,就代表你肯定有临界资源的访问权限了;再加上我们给访问临界区加了锁,自然也不会出现被其他线程抢了的情况🎉

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//生产者
void Push(T& in)
{
sem_wait(&_spaceSem);//获取空间
pthread_mutex_lock(&_proMutex);

_rq[_rear] = in; //生产
_rear++; // 写入位置后移
_rear %= _rq.size(); // 更新下标,避免越界

pthread_mutex_unlock(&_proMutex);
sem_post(&_valueSem);//释放数量
}
//消费
T pop()
{
sem_wait(&_valueSem);
pthread_mutex_lock(&_conMutex);

T tmp = _rq[_front];
_front++;
_front %= _rq.size();// 更新下标,保证环形特征

pthread_mutex_unlock(&_conMutex);
sem_post(&_spaceSem);

return tmp;
}

5.5 测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
string ops ="+-*/%";

void *consumer(void *args)
{
RingQueue<Task> *bqp = (RingQueue<Task> *)args;
while (1)
{
Task t = bqp->pop();
int result = t(); //处理任务
int one, two;
char op;
t.get(&one, &two, &op);
cout << "consumer [" << pthread_self() << "] " << (unsigned long)time(nullptr) << " 消费了一个任务: " << one << op << two << " = " << result << endl;
}
}
void *producer(void *args)
{
RingQueue<Task> *bqp = (RingQueue<Task> *)args;
while (1)
{
// 制作任务
int one = rand() % 50;
int two = rand() % 20;
char op = ops[rand() % ops.size()];
Task t(one, two, op);
// 投放给消费者生产
bqp->push(t);
cout << "producter[" << pthread_self() << "] " << (unsigned long)time(nullptr) << " 生产了一个任务: " << one << op << two << " = ?" << endl;
sleep(1);
}
}

void test2()
{
pthread_t t1,t2;
RingQueue<Task> bq(5);
pthread_create(&t1,nullptr,producer,(void*)&bq);
sleep(1);
pthread_create(&t2,nullptr,consumer,(void*)&bq);

pthread_join(t1,nullptr);
pthread_join(t2,nullptr);
}

int main()
{
srand((unsigned long)time(nullptr));//乘一个数字添加随机性
test2();

return 0;
}

刚开始运行的时候,发现了这个错误

image

这是因为在task中,我们重写了构造函数,编译器就不会生成默认构造了

1
2
3
Task(int one, int two, char op) 
: _elem1(one), _elem2(two), _operator(op)
{}

而创建环形队列的时候,会自动调用构造函数。此时发现没有匹配的构造函数(无参),就会报错!

解决办法是,添加上一个无参构造,直接使用default关键字即可

1
Task() = default;//使用系统默认生成的无参构造

运行,可以看到生产消费稳定跑起来了!

1
2
3
4
5
6
7
8
9
[muxue@bt-7274:~/git/linux/code/23-01-17 ringqueue]$ ./test
producter[139758380586752] 1673936332 生产了一个任务: 24-8 = ?
producter[139758380586752] 1673936333 生产了一个任务: 34/19 = ?
consumer [139758372194048] 1673936333 消费了一个任务: 24-8 = 16
consumer [139758372194048] 1673936333 消费了一个任务: 34/19 = 1
producter[139758380586752] 1673936334 生产了一个任务: 18*16 = ?
consumer [139758372194048] 1673936334 消费了一个任务: 18*16 = 288
producter[139758380586752] 1673936335 生产了一个任务: 48+15 = ?
consumer [139758372194048] 1673936335 消费了一个任务: 48+15 = 63

增多线程,也能正常运行!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
[muxue@bt-7274:~/git/linux/code/23-01-17 ringqueue]$ ./test
producter[140344921630464] 1673936659 生产了一个任务: 44-15 = ?
consumer [140344913237760] 1673936660 消费了一个任务: 44-15 = 29
producter[140344921630464] 1673936660 生产了一个任务: 33+14 = ?
consumer [140344913237760] 1673936660 消费了一个任务: 33+14 = 47
producter[140344904845056] 1673936661 生产了一个任务: 14/12 = ?
consumer [140344896452352] 1673936661 消费了一个任务: 14/12 = 1
producter[140344921630464] 1673936661 生产了一个任务: 24-16 = ?
consumer [140344913237760] 1673936661 消费了一个任务: 24-16 = 8
producter[140344904845056] 1673936662 生产了一个任务: 3-3 = ?
consumer [140344896452352] 1673936662 消费了一个任务: 3-3 = 0
producter[140344921630464] 1673936662 生产了一个任务: 36*7 = ?
consumer [140344913237760] 1673936662 消费了一个任务: 36*7 = 252
producter[140344904845056] 1673936663 生产了一个任务: 28+8 = ?
consumer [140344896452352] 1673936663 消费了一个任务: 28+8 = 36
producter[140344921630464] 1673936663 生产了一个任务: 26-5 = ?
consumer [140344913237760] 1673936663 消费了一个任务: 26-5 = 21

结语

关于线程同步的知识点,大概就是这些了。博客写的满满当当,在理解了接口的基本命名和使用逻辑后,感觉就没有那么难了

加油哦!