In this section, we are going to implement a program just like in the recipe before, but this time with multiple producers and multiple consumers:
- First, we need to include all needed headers and we declare that we use namespace std and chrono_literals:
#include <iostream>
#include <iomanip>
#include <sstream>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>
using namespace std;
using namespace chrono_literals;
- Then we implement the synchronized printing helper from the other recipe in this chapter because we are going to do a lot of concurrent printing:
struct pcout : public stringstream {
static inline mutex cout_mutex;
~pcout() {
lock_guard<mutex> l {cout_mutex};
cout << rdbuf();
}
};
- All producers write values into the same queue and all consumers will also take values out of this queue. In addition to that queue, we need a mutex that protects both the queue and a flag that can tell if the production was stopped at some point:
queue<size_t> q;
mutex q_mutex;
bool production_stopped {false};
- We are going to employ two different condition_variables in this program. In the single producer/consumer recipe, we had a condition_variable telling that there are new items in the queue. In this case, we make it a bit more complicated. We want the producers to produce until the queue contains a certain stock amount of items. If that stock amount is reached, they shall sleep. This way the go_consume variable can be used to wake up consumers which then, in turn, can wake up the producers with the go_produce variable again:
condition_variable go_produce;
condition_variable go_consume;
- The producer function accepts a producer ID number, a total number of items to produce and a stock limit as arguments. It then enters its own production loop. There, it first locks the queue's mutex and unlocks it again in the go_produce.wait call. It waits for the condition that the queue size is below the stock threshold:
static void producer(size_t id, size_t items, size_t stock)
{
for (size_t i = 0; i < items; ++i) {
unique_lock<mutex> lock(q_mutex);
go_produce.wait(lock,
[&] { return q.size() < stock; });
- After the producer was woken up, it produces an item and pushes it into the queue. The queue value is calculated from the expression id * 100 + i. This way we can later see which producer produced it because the hundreds in the number are the producer ID. We also print the production event to the terminal. The format of the printing may look strange, but it will align nicely with the consumer output in the terminal later:
q.push(id * 100 + i);
pcout{} << " Producer " << id << " --> item "
<< setw(3) << q.back() << 'n';
- After production, we can wake up sleeping consumers. A sleeping period of 90 milliseconds simulates that producing items takes some time:
go_consume.notify_all();
this_thread::sleep_for(90ms);
}
pcout{} << "EXIT: Producer " << id << 'n';
}
- Now to the consumer function that only accepts a consumer ID as an argument. It shall continue waiting for items if the production has not stopped, or the queue is not empty. If the queue is empty, but the production has not stopped, then it is possible that there might be new items soon:
static void consumer(size_t id)
{
while (!production_stopped || !q.empty()) {
unique_lock<mutex> lock(q_mutex);
- After locking the queue mutex, we unlock it again in order to wait on the go_consume event variable. The lambda expression argument describes that we want to return from the wait call when the queue contains items. The second argument 1s tells that we do not want to wait forever. If it takes longer than 1 second, we want to drop out of the wait function. We can distinguish if the wait_for function returned because the predicate condition holds, or if we dropped out of it because of a timeout because it will return false in case of the timeout. If there are new items in the queue, we consume them and print this event to the terminal:
if (go_consume.wait_for(lock, 1s,
[] { return !q.empty(); })) {
pcout{} << " item "
<< setw(3) << q.front()
<< " --> Consumer "
<< id << 'n';
q.pop();
- After item consumption, we notify the producers and sleep for 130 milliseconds to simulate that consuming items is also time-consuming:
go_produce.notify_all();
this_thread::sleep_for(130ms);
}
}
pcout{} << "EXIT: Producer " << id << 'n';
}
- In the main function, we instantiate a vector for worker threads and another for consumer threads:
int main()
{
vector<thread> workers;
vector<thread> consumers;
- Then we spawn three producer threads and five consumer threads:
for (size_t i = 0; i < 3; ++i) {
workers.emplace_back(producer, i, 15, 5);
}
for (size_t i = 0; i < 5; ++i) {
consumers.emplace_back(consumer, i);
}
- We first let the producer threads finish. As soon as all of them have returned, we set the production_stopped flag, which will lead the consumers to finish, too. We need to collect those and then we can quit the program:
for (auto &t : workers) { t.join(); }
production_stopped = true;
for (auto &t : consumers) { t.join(); }
}
- Compiling and running the program leads to the following output. The output is very long, which is why it is truncated here. We can see that the producers go to sleep from time to time, and let the consumers eat up some items until they finally produce again. It is interesting to alter the wait times for producers/consumers, as well as manipulating the number of producers/consumers and stock items because this completely changes the output patterns:
$ ./multi_producer_consumer
Producer 0 --> item 0
Producer 1 --> item 100
item 0 --> Consumer 0
Producer 2 --> item 200
item 100 --> Consumer 1
item 200 --> Consumer 2
Producer 0 --> item 1
Producer 1 --> item 101
item 1 --> Consumer 0
...
Producer 0 --> item 14
EXIT: Producer 0
Producer 1 --> item 114
EXIT: Producer 1
item 14 --> Consumer 0
Producer 2 --> item 214
EXIT: Producer 2
item 114 --> Consumer 1
item 214 --> Consumer 2
EXIT: Consumer 2
EXIT: Consumer 3
EXIT: Consumer 4
EXIT: Consumer 0
EXIT: Consumer 1