09-并发控制

09-并发控制:同步

希望可以控制顺序

  • 控制并发,使得 “两个或两个以上随时间变化的量在变化过程中保持一定的相对关系”

1.开始同时执行

2.最后等待同时执行

互相已知

从一个简单状态到另一个简单状态(状态分支聚起来)

简单->复杂->简单……..

先到先等,先完成等待

sync

每个线程都有wait_next_beat等待下一个拍子再执行

1
2
3
4
5
void wait_next_beat() {
retry:
if (!next_beat_has_come) {
goto retry;
}

同步(synchronization)指的是多个线程在执行时保持协调,以确保它们在预期的时间点上执行特定的操作。具体来说,这里的同步是指:

  1. 开始时间一致:确保某些操作在多个线程中同时开始。例如,wait_next_beat 函数确保 T_player 线程在预期的节拍时开始播放音符。
  2. 协调执行:确保线程在执行过程中保持协调。例如,release_beat函数由 T_conductor线程调用,以通知 T_player线程可以继续到下一个节拍。

在这个代码中,T_conductor线程通过调用 release_beat来增加节拍计数 n,而 T_player线程通过 wait_next_beat来等待节拍计数达到预期值,从而实现同步。

生产者-消费者问题

共享缓冲区

打印左括号的条件:缓冲区未满就可以

打印右括号的条件:当前有左括号就可以

等到达成同步条件再执行

错误1:ready是共享变量,可能已经在解锁时被其他线程改变了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void T_produce() {
while (1) {
retry:
mutex_lock(&lk);
int ready = (depth < n);
mutex_unlock(&lk);
if (!ready) goto retry;

// assert(depth < n);

mutex_lock(&lk);
printf("(");
depth++;
mutex_unlock(&lk);
}
}

正确:但是如果条件不满足,就会不断锁上再解锁,浪费资源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void T_produce(){
while(1){
retry:
mutex_lock(&lk);
if(!(deepth < n)){
mutex_unlock(&lk);
goto retry;
}

assert(depth < n);

printf("(");
depth++;
mutex_unlock(&lk);
}
}

条件变量

1
2
#define CANPRODUCE (depth < n)
#define CANCONSUME (depth > 0)

不符合条件时,睡眠,等待被唤醒

生产者和消费者是两个不同的线程

所以以下是错误的,因为生产者和消费者的条件变量不相同,唤醒不一定满足自己的条件,所以会使断言不一定正确(详见ostep)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void T_produce() {
while (1) {
mutex_lock(&lk);
if (!CAN_PRODUCE) {
cond_wait(&cv, &lk);
}
// assert(CAN_PRODUCE);
printf("(");
depth++;

cond_signal(&cv);
mutex_unlock(&lk);
}
}

条件变量的正确使用,每次改动后都通知所有线程

1
2
3
4
5
6
7
8
mutex_lock(&mutex);
while (!COND)
{
wait(&cv, &mutex);
}
assert(cond);
...
mutex_unlock(&mutex);

并行编程的本质

把任务分解

生成有向无环图

并行计算,每个节点满足条件就计算

拓扑排序,计算

打印鱼

把状态图画出来,根据状态图得到条件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
const char roles[] = ".<<<<<<<>>>>>>>______";
//每个线程获得一个字符,如果太少就不能等到下一个状态,就会阻塞
void fish_thread(int id) {
char role = roles[id];
while (1) {
mutex_lock(&lk);
while (!can_print(role)) {
cond_wait(&cv, &lk);
}

putchar(role); // Not lock-protected

current = next(role);
assert(current);
cond_broadcast(&cv);
mutex_unlock(&lk);
}
}