This recipe is an extension of the preceding recipe. Instead of synchronizing only one producer with one consumer, we implemented a program that synchronizes M producers with N consumers. On top of that, not only the consumers go to sleep if there are no items for them left, but also the producers go to sleep as soon as the item queue becomes too long.
When multiple consumers wait for the same queue to fill up, then this would generally also work with the consumer code from the one producer/one consumer scenario. As long as only one thread locks the mutex that protects the queue and then takes items out of it, the code is safe. It does not matter how many threads are waiting for the lock at the same time. The same applies to the producers, as in both scenarios the only important thing is that the queue is never accessed by more than one thread at a time.
So what makes this program really more complex than just running the one producer/one consumer example with more threads is the fact that we make the producer threads stop as soon as the item queue length reached a certain threshold. In order to meet that requirement, we implemented two different signals with their own condition_variable:
- The go_produce signals the event that the queue is not completely filled to the maximum and the producers may fill it up again.
- The go_consume signals the event that the queue reached its maximum length and consumers are free to consume items again.
This way producers fill items into the queue and signal the go_consume event to the consuming threads, which wait on the following line:
if (go_consume.wait_for(lock, 1s, [] { return !q.empty(); })) {
// got the event without timeout
}
The producers, on the other hand, wait on the following line until they are allowed to produce again:
go_produce.wait(lock, [&] { return q.size() < stock; });
One interesting detail is that we do not let consumers wait forever. In the go_consume.wait_for call, we additionally added a timeout argument of 1 second. This is the exit mechanism for consumers: if the queue is empty for longer than a second, maybe there are no active producers any longer.
For the sake of simplicity, the code tries to keep the queue length always at the maximum. A more sophisticated program could let the consumer threads push a wake-up notification, only if the queue has only half the size of its maximum length. This way producers would be woken up before the queue runs empty again, but not unnecessarily earlier when there are still enough items in the queue.
One situation that condition_variable solves elegantly for us is the following: If a consumer fires the go_produce notification, there might be a horde of producers racing to produce the next item. If only one item is missing, then there will only be one producer producing it. If all producers would always produce an item as soon as the go_produce event is fired, we would often see the case that the queue is filled above its allowed maximum.
Let's imagine the situation that we have (max - 1) items in the queue and want one new item produced so that the queue is filled up again. No matter if a consumer thread calls go_produce.notify_one() (which would wake up only one waiting thread) or go_produce.notify_all() (which wakes up all waiting threads), we have the guarantee that only one producer thread will exit the go_produce.wait call, because, for all other producer threads, the q.size() < stock wait condition doesn't hold any longer as soon as they get the mutex after being woken up.