The conditional variable synchronization primitive is used when two or more threads need to communicate with each other and proceed only when they receive a particular signal or event. The thread that waits for a particular signal or event has to acquire a mutex before it starts waiting for the signal or event.
Let's try to understand the use case of a conditional variable with a producer/consumer problem. I'm going to create two threads, namely PRODUCER and CONSUMER. The PRODUCER thread will add a value to the queue and notify the CONSUMER thread. The CONSUMER thread will wait for the notification from PRODUCER. On receipt of the notification from the PRODUCER thread, the CONSUMER thread will remove the entry from the queue and print it.
Let's understand how the Thread.h header shown here makes use of the conditional variable and mutex:
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <string>
using namespace std;
enum ThreadType {
PRODUCER,
CONSUMER
};
class Thread {
private:
static mutex locker;
static condition_variable untilReady;
static bool ready;
static queue<int> appQueue;
thread *pThread;
ThreadType threadType;
bool stopped;
string name;
void run();
public:
Thread(ThreadType typeOfThread);
~Thread();
void start();
void stop();
void join();
void detach();
};
As PRODUCER and CONSUMER threads are supposed to use the same mutex and conditional_variable, they are declared static. The conditional variable synchronization primitive expects a predicate function that is going to make use of the ready boolean flag. Hence, I have declared the ready flag as well in the static scope.
Let's move on to the Thread.cpp source file, as follows:
#include "Thread.h"
mutex Thread::locker;
condition_variable Thread::untilReady;
bool Thread::ready = false;
queue<int> Thread::appQueue;
Thread::Thread( ThreadType typeOfThread ) {
pThread = NULL;
stopped = false;
threadType = typeOfThread;
(CONSUMER == typeOfThread) ? name = "CONSUMER" : name = "PRODUCER";
}
Thread::~Thread( ) {
delete pThread;
pThread = NULL;
}
void Thread::run() {
int count = 0;
int data = 0;
while ( 1 ) {
switch ( threadType ) {
case CONSUMER:
{
cout << name << " waiting to acquire mutex ..." << endl;
unique_lock<mutex> uniqueLocker( locker );
cout << name << " acquired mutex ..." << endl;
cout << name << " waiting for conditional variable signal..." << endl;
untilReady.wait ( uniqueLocker, [] { return ready; } );
cout << name << " received conditional variable signal ..." << endl;
data = appQueue.front( ) ;
cout << name << " received data " << data << endl;
appQueue.pop( );
ready = false;
}
cout << name << " released mutex ..." << endl;
break;
case PRODUCER:
{
cout << name << " waiting to acquire mutex ..." << endl;
unique_lock<mutex> uniqueLocker( locker );
cout << name << " acquired mutex ..." << endl;
if ( 32000 == count ) count = 0;
appQueue.push ( ++ count );
ready = true;
uniqueLocker.unlock();
cout << name << " released mutex ..." << endl;
untilReady.notify_one();
cout << name << " notified conditional signal ..." << endl;
}
break;
}
}
}
void Thread::start( ) {
pThread = new thread ( &Thread::run, this );
}
void Thread::stop( ) {
stopped = true;
}
void Thread::join( ) {
pThread->join( );
}
void Thread::detach( ) {
pThread->detach( );
}
In the preceding Thread class, I used unique_lock<std::mutex>. The conditional_variable::wait() method expects unique_lock, hence I'm using unique_lock here. Now, unique_lock<std::mutex> supports ownership transfer, recursive locking, deferred locking, manual locking, and unlocking without deleting unique_lock, unlike lock_guard<std::mutex>. The lock_guard<std::mutex> instance immediately locks the mutex, and the mutex gets unlocked automatically when the lock_guard<std::mutex> instance goes out of the scope. However, lock_guard doesn't support manual unlocking.
Because we haven't created the unique_lock instance with the deferred locking option, unique_lock will lock the mutex immediately, just like lock_guard.
The Thread::run() method is our thread function. Depending on ThreadType supplied to the Thread constructor, the thread instance will behave either as the PRODUCER or CONSUMER thread.
The PRODUCER thread first locks the mutex and appends an integer to the queue, which is shared among PRODUCER and CONSUMER threads. Once the queue is updated, PRODUCER will unlock the mutex before notifying CONSUMER; otherwise, CONSUMER will not be able to acquire the mutex and receive the conditional variable signal.
The CONSUMER thread first acquires the mutex and then waits for the conditional variable signal. On receipt of the conditional signal, the CONSUMER thread retrieves the value from the queue and prints the value and resets the ready flag so that the process can be repeated until the application is terminated.
It is recommended to make use of unique_lock<std::mutex>, lock_guard<std::mutex>, or scoped_lock<std::mutex> to avoid deadlocks. At times, it is possible we may not unlock the mutex that leads to deadlocks; hence, the use of mutex directly isn't recommended.
Now lets look at the code in the main.cpp file:
#include "Thread.h"
int main ( ) {
Thread producer( ThreadType::PRODUCER );
Thread consumer( ThreadType::CONSUMER );
producer.start();
consumer.start();
producer.join();
consumer.join();
return 0;
}