生产者-消费者和条件变量

1 条件变量

某个线程需要检查某个条件是否满足, 只有当条件满足时它才继续执行, 但是这个条件的状态与其它线程的执行相关。这是一种常见的场景, 通常有两种比较容易想到的方法去判断条件是否成立

  • 通过spin的方式轮询一个共享变量(缺点:效率低,浪费CPU时间)
  • 使用条件变量: 条件不满足,进入睡眠状态; 条件满足, (被唤醒)继续执行

A condition variable is an explicit queue that threads can put themselves on when some state of execution (i.e., some condition) is not as desired (by waiting on the condition); some other thread, when it changes said state, can then wake one (or more) of those waiting threads and thus allo them to continue (by signaling on the condition).

POSIX
1
2
pthread_cond_wait(pthread_cond_t *c, pthread_mutex_t *m);
pthread_cond_signal(pthread_cond_t *c);
C++
1
2
3
4
5
6
std::condition_variable cv;
std::mutext mtx;
std::unique_lock<std::mutex> lk(mtx);
cv.wait(lk);
cv.notify_one();
cv.notify_all();

PS:

  • 执行wait前必须拥有🔒
  • wait会让当前进程释放🔒, 进入睡眠状态
  • 如果被唤醒了, 会先重新获得🔒, 然后从wait return

2 生产者-消费者问题

生产者-消费者问题, 也称作有限缓冲问题, 是多线程同步的一个经典问题。而这一问题的一种经典解法可以通过1把锁+2个条件变量+while完成。关于如下问题, 可以阅读《Three Easy Pieces》的30.2节:

  • 一个条件变量行不行
  • 不用while用if有什么问题
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    #include <iostream>
    #include <thread>
    #include <condition_variable>
    #include <assert.h>

    #define CAN_PRODUCE (count < 5)
    #define CNA_CONSUME (count > 0)

    std::mutex mtx;
    std::condition_variable consumer_cv, producer_cv;

    void producer() {
    while (1) {
    std::unique_lock<std::mutex> lk(mtx);
    while (!CAN_PRODUCE) {
    consumer_cv.wait(lk);
    }
    assert(CAN_PRODUCE);
    count++;
    printf("(");
    producer_cv.notify_one();
    }
    }

    void consumer() {
    while (1) {
    std::unique_lock<std::mutex> lk(mtx);
    while (!CAN_CONSUME) {
    producer_cv.wait(lk);
    }
    assert(CAN_CONSUME);
    count--;
    printf(")");
    consumer_cv.notify_one();
    }
    }

    int main() {
    std::thread t1(producer);
    std::thread t2(consumer);
    t1.join();
    t2.join();
    return 0;
    }
notify_all的使用场景
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// how many bytes of the heap are free?
int bytesLeft = MAX_HEAP_SIZE;

// need lock and condition too
cond_t c;
mutex_t m;

void *allocate(int size) {
Pthread_mutex_lock(&m);
while (bytesLeft < size>)
Pthread_cond_wait(&c, &m);
void *ptr = ...; // get mem from heap
bytesLeft -= size;
Pthread_mutex_unlock(&m);
return ptr;
}

void free(void *ptr, int size) {
Pthread_mutex_lock(&m);
bytesLeft += size;
Pthread_cond_signal(&c); // whom to signal
Pthread_mutex_unlock(&m);
}

一个内存分配的case, Pthread_cond_signal只会唤醒其中一个线程。假如线程a和线程b在睡眠期前分别请求了10bytes和100bytes的内存, 但是现在只有50bytes内存, 随机唤醒可能会将b唤醒, 而正确的做法应该是将a唤醒。为了解决这个问题, 应该使用Pthread_cond_broadcast
在C++11中, 这两个API对应着std::condition_variablenotify_onenotify_all

Mesa Semantics

唤醒一个线程, 但并不保证它被唤醒后一定满足继续执行的条件 (使用while代替if来保证唤醒后满足继续执行的条件)。如过能够保证线程被唤醒后可以立即执行后续程序, 称其具有Hoare semantics。关于Mesa semanticsHoare semantics的详细内容, 可以阅读《Three Easy Pieces》30.2节和论文《Monitors: An Operating SystemStructuring Concept》《Experience with Processes and Monitors in Mesa 》

Signaling a thread only wakes them up; it is thus a hint that the state fo the world has changed, but there is no guarantee that when the woken thread runs, the state will still be as desired. This interpretation of what a signal means is often referred to as Mesa semantics.

Comments

Unable to load Disqus, please make sure your network can access.