In this recipe, we simply started two threads. The first thread produces items and puts them into a queue. The other takes items out of the queue. Whenever one of those threads touches the queue in any way, it locks the common mutex mut which is accessible for both. This way we made sure that it cannot happen that both threads manipulate the queue's state at the same time.
Apart from the queue and the mutex, we declared generally four variables that were involved in the producer-consumer thing:
queue<size_t> q;
mutex mut;
condition_variable cv;
bool finished {false};
The variable finished is easy to explain. It was set to true when the producer finished producing its fixed amount of items. When the consumer sees that this variable is true, it consumes the last items in the queue and stops consuming. But what is the condition_variable cv for? We used cv in two different contexts. One of the contexts was waiting for a specific condition, and the other was signaling that condition.
The consumer side that waits for a specific condition looks like this. The consumer thread loops over a block that first locks mutex mut in a unique_lock. Then it calls cv.wait:
while (!finished) {
unique_lock<mutex> l {mut};
cv.wait(l, [] { return !q.empty() || finished; });
while (!q.empty()) {
// consume
}
}
This code is somewhat equivalent to the following alternative code. We will elaborate soon why it is not really the same:
while (!finished) {
unique_lock<mutex> l {mut};
while (q.empty() && !finished) {
l.unlock();
l.lock();
}
while (!q.empty()) {
// consume
}
}
This means that we generally first acquire the lock and then check what scenario we have:
- Are there items to consume? Then keep the lock, consume, release the lock, and start over.
- Else, if there are no consumable items but the producer is still alive, release the mutex to give the producer a chance of adding items to the queue. Then, try to lock it again in hope that the situation changes and we get to see situation 1.
The real reason why the cv.wait line is not equivalent to the while (q.empty() && ... ) construct is, that we cannot simply loop over a l.unlock(); l.lock(); cycle. If the producer thread is inactive for some time, then this would lead to continuous locking and unlocking of the mutex, which makes no sense because it needlessly burns CPU cycles.
An expression like cv.wait(lock, predicate) will wait until predicate() returns true. But it does not do this by continuously unlocking and locking lock. In order to wake a thread up that blocks on the wait call of a condition_variable object, another thread has to call the notify_one() or notify_all() method on the same object. Only then the waiting thread(s) is/are kicked out of their sleep in order to check if predicate() holds.
The nice thing about the wait call checking the predicate is that if there is a spurious wakeup call, the thread will go to sleep immediately again. This means that it does not really harm the program flow (but maybe the performance) if we have too many notify calls.
On the producer side, we just called cv.notify_all() after the producer inserted an item to the queue and after it produced its last item and set the finished flag to true. This was enough to direct the consumer.