We are going to implement a simple producer/consumer program which runs a single producer of values in its own thread, as well as a single consumer thread in another thread:
- First, we need to perform all the needed includes:
#include <iostream>
#include <queue>
#include <tuple>
#include <condition_variable>
#include <thread>
using namespace std;
using namespace chrono_literals;
- We instantiate a queue of simple numeric values and call it q. The producer will push values into it, and the consumer will take values out of it. In order to synchronize both, we need a mutex. In addition to that, we instantiate a condition_variable cv. The variable finished will be the producer's way to tell the consumer that no more values will follow:
queue<size_t> q;
mutex mut;
condition_variable cv;
bool finished {false};
- Let's first implement the producer function. It accepts an argument items which limits the maximum number of items for production. In a simple loop, it will sleep 100 milliseconds for every item, which simulates some computational complexity. Then we lock the mutex that synchronizes access to the queue. After successful production and insertion to the queue, we call cv.notify_all(). This function wakes the consumer up. We will see later at the consumer side how this works:
static void producer(size_t items) {
for (size_t i {0}; i < items; ++i) {
this_thread::sleep_for(100ms);
{
lock_guard<mutex> lk {mut};
q.push(i);
}
cv.notify_all();
}
- After having produced all items, we lock the mutex again because we are going to change to set the finished bit. Then we call cv.notify_all() again:
{
lock_guard<mutex> lk {mut};
finished = true;
}
cv.notify_all();
}
- Now we can implement the consumer function. It takes no arguments because it will blindly consume until the queue runs empty. In a loop that is executed as long as finished is not set, it will first lock the mutex that protects both the queue and the finished flag. As soon as it has the lock, it calls cv.wait with the lock and a lambda expression as arguments. The lambda expression is a predicate that tells if the producer thread is still alive and if there is anything to consume in the queue:
static void consumer() {
while (!finished) {
unique_lock<mutex> l {mut};
cv.wait(l, [] { return !q.empty() || finished; });
- The cv.wait call unlocks the lock and waits until the condition described by the predicate function holds. Then, it locks the mutex again and consumes everything from the queue until it appears empty. If the producer is still alive, it will iterate through the loop again. Otherwise, it will terminate because finished is set, which is the producer's way to signal that there are no further items being produced:
while (!q.empty()) {
cout << "Got " << q.front()
<< " from queue.n";
q.pop();
}
}
}
- In the main function, we start a producer thread which produces 10 items, and a consumer thread. Then we wait until their completion and terminate the program:
int main() {
thread t1 {producer, 10};
thread t2 {consumer};
t1.join();
t2.join();
cout << "finished!n";
}
- Compiling and running the program yields the following output. When the program is executed, we can see that there is some time (100 milliseconds) between each line, because the production of items takes some time:
$ ./producer_consumer
Got 0 from queue.
Got 1 from queue.
Got 2 from queue.
Got 3 from queue.
Got 4 from queue.
Got 5 from queue.
Got 6 from queue.
Got 7 from queue.
Got 8 from queue.
Got 9 from queue.
finished!