Next is the Worker class. This contains the logic which will be called by Dispatcher in order to process a request.
#pragma once
#ifndef WORKER_H
#define WORKER_H
#include "abstract_request.h"
#include <condition_variable>
#include <mutex>
using namespace std;
class Worker {
condition_variable cv;
mutex mtx;
unique_lock<mutex> ulock;
AbstractRequest* request;
bool running;
bool ready;
public:
Worker() { running = true; ready = false; ulock = unique_lock<mutex>(mtx); }
void run();
void stop() { running = false; }
void setRequest(AbstractRequest* request) { this->request = request; ready = true; }
void getCondition(condition_variable* &cv);
};
#endif
Whereas the adding of a request to Dispatcher does not require any special logic, the Worker class does require the use of condition variables to synchronize itself with the dispatcher. For the C++11 threads API, this requires a condition variable, a mutex, and a unique lock.
The unique lock encapsulates the mutex, and will ultimately be used with the condition variable as we will see in a moment.
Beyond this, we define methods to start and stop the worker, to set a new request for processing, and to obtain access to its internal condition variable.
Moving on, the rest of the implementation is written as follows:
#include "worker.h"
#include "dispatcher.h"
#include <chrono>
using namespace std;
void Worker::getCondition(condition_variable* &cv) {
cv = &(this)->cv;
}
void Worker::run() {
while (running) {
if (ready) {
ready = false;
request->process();
request->finish();
}
if (Dispatcher::addWorker(this)) {
// Use the ready loop to deal with spurious wake-ups.
while (!ready && running) {
if (cv.wait_for(ulock, chrono::seconds(1)) == cv_status::timeout) {
// We timed out, but we keep waiting unless
// the worker is
// stopped by the dispatcher.
}
}
}
}
}
Beyond the getter function for the condition variable, we define the run() function, which dispatcher will run for each worker thread upon starting it.
Its main loop merely checks that the stop() function hasn't been called yet, which would have set the running Boolean value to false, and ended the work thread. This is used by Dispatcher when shutting down, allowing it to terminate the worker threads. Since Boolean values are generally atomic, setting and checking can be done simultaneously without risk or requiring a mutex.
Moving on, the check of the ready variable is to ensure that a request is actually waiting when the thread is first run. On the first run of the worker thread, no request will be waiting, and thus, attempting to process one would result in a crash. Upon Dispatcher setting a new request, this Boolean variable will be set to true.
If a request is waiting, the ready variable will be set to false again, after which the request instance will have its process() and finish() functions called. This will run the business logic of the request on the worker thread's thread, and finalize it.
Finally, the worker thread adds itself to the dispatcher using its static addWorker() function. This function will return false if no new request is available, and cause the worker thread to wait until a new request has become available. Otherwise, the worker thread will continue with the processing of the new request that Dispatcher will have set on it.
If asked to wait, we enter a new loop. This loop will ensure that when the condition variable is woken up, it is because we got signaled by Dispatcher (ready variable set to true), and not because of a spurious wake-up.
Last of all, we enter the actual wait() function of the condition variable using the unique lock instance we created before along with a timeout. If a timeout occurs, we can either terminate the thread, or keep waiting. Here, we choose to do nothing and just re-enter the waiting loop.