In essence, a condition variable provides a mechanism through which a thread's execution can be controlled by another thread. This is done by having a shared variable which a thread will wait for until signaled by another thread. It is an essential part of the scheduler implementation we looked at in Chapter 11, Thread Synchronization and Communication.
For the C++11 API, condition variables and their associated functionality are defined in the <condition_variable> header.
The basic usage of a condition variable can be summarized from that scheduler's code in Chapter 11, Thread Synchronization and Communication.
#include "abstract_request.h"
#include <condition_variable>
#include <mutex>
using namespace std;
class Worker {
condition_variable cv;
mutex mtx;
unique_lock<mutex> ulock;
AbstractRequest* request;
bool running;
bool ready;
public:
Worker() { running = true; ready = false; ulock = unique_lock<mutex>(mtx); }
void run();
void stop() { running = false; }
void setRequest(AbstractRequest* request) { this->request = request; ready = true; }
void getCondition(condition_variable* &cv);
};
In the constructor, as defined in the preceding Worker class declaration, we see the way a condition variable in the C++11 API is initialized. The steps are listed as follows:
- Create a condition_variable and mutex instance.
- Assign the mutex to a new unique_lock instance. With the constructor we use here for the lock, the assigned mutex is also locked upon assignment.
- The condition variable is now ready for use:
#include <chrono>
using namespace std;
void Worker::run() {
while (running) {
if (ready) {
ready = false;
request->process();
request->finish();
}
if (Dispatcher::addWorker(this)) {
while (!ready && running) {
if (cv.wait_for(ulock, chrono::seconds(1)) ==
cv_status::timeout) {
// We timed out, but we keep waiting unless the
worker is
// stopped by the dispatcher.
}
}
}
}
}
Here, we use the wait_for() function of the condition variable, and pass both the unique lock instance we created earlier and the amount of time which we want to wait for. Here we wait for 1 second. If we time out on this wait, we are free to re-enter the wait (as is done here) in a continuous loop, or continue execution.
It's also possible to perform a blocking wait using the simple wait() function, or wait until a certain point in time with wait_for().
As noted, when we first looked at this code, the reason why this worker's code uses the ready Boolean variable is to check that it was really another thread which signaled the condition variable, and not just a spurious wake-up. It's an unfortunate complication of most condition variable implementations--including the C++11 one--that they are susceptible to this.
As a result of these random wake-up events, it is necessary to have some way to ensure that we really did wake up intentionally. In the scheduler code, this is done by having the thread which wakes up the worker thread also set a Boolean value which the worker thread can wake up.
Whether we timed out, or were notified, or suffered a spurious wake-up can be checked with the cv_status enumeration. This enumeration knows these two possible conditions:
- timeout
- no_timeout
The signaling, or notifying, itself is quite straightforward:
void Dispatcher::addRequest(AbstractRequest* request) {
workersMutex.lock();
if (!workers.empty()) {
Worker* worker = workers.front();
worker->setRequest(request);
condition_variable* cv;
worker->getCondition(cv);
cv->notify_one();
workers.pop();
workersMutex.unlock();
}
else {
workersMutex.unlock();
requestsMutex.lock();
requests.push(request);
requestsMutex.unlock();
}
}
In this preceding function from the Dispatcher class, we attempt to obtain an available worker thread instance. If found, we obtain a reference to the worker thread's condition variable as follows:
void Worker::getCondition(condition_variable* &cv) {
cv = &(this)->cv;
}
Setting the new request on the worker thread also changes the value of the ready variable to true, allowing the worker to check that it is indeed allowed to continue.
Finally, the condition variable is notified that any threads which are waiting on it can now continue using notify_one(). This particular function will signal the first thread in the FIFO queue for this condition variable to continue. Here, only one thread will ever be notified, but if there are multiple threads waiting for the same condition variable, the calling of notify_all() will allow all threads in the FIFO queue to continue.