Table of Contents for
Mastering C++ Multithreading

Version ebook / Retour

Cover image for bash Cookbook, 2nd Edition Mastering C++ Multithreading by Maya Posch Published by Packt Publishing, 2017
  1. Mastering C++ Multithreading
  2. Title Page
  3. Copyright
  4. Mastering C++ Multithreading
  5. Credits
  6. About the Author
  7. About the Reviewer
  8. www.PacktPub.com
  9. Why subscribe?
  10. Customer Feedback
  11. Table of Contents
  12. Preface
  13. What this book covers
  14. What you need for this book
  15. Who this book is for
  16. Conventions
  17. Reader feedback
  18. Downloading the example code
  19. Errata
  20. Piracy
  21. Questions
  22. Revisiting Multithreading
  23. Getting started
  24. The multithreaded application
  25. Makefile
  26. Other applications
  27. Summary
  28. Multithreading Implementation on the Processor and OS
  29. Defining processes and threads
  30. Tasks in x86 (32-bit and 64-bit)
  31. Process state in ARM
  32. The stack
  33. Defining multithreading
  34. Flynn's taxonomy
  35. Symmetric versus asymmetric multiprocessing
  36. Loosely and tightly coupled multiprocessing
  37. Combining multiprocessing with multithreading
  38. Multithreading types
  39. Temporal multithreading
  40. Simultaneous multithreading (SMT)
  41. Schedulers
  42. Tracing the demo application
  43. Mutual exclusion implementations
  44. Hardware
  45. Software
  46. Summary
  47. C++ Multithreading APIs
  48. API overview
  49. POSIX threads
  50. Windows support
  51. PThreads thread management
  52. Mutexes
  53. Condition variables
  54. Synchronization
  55. Semaphores
  56. Thread local storage (TLC)
  57. Windows threads
  58. Thread management
  59. Advanced management
  60. Synchronization
  61. Condition variables
  62. Thread local storage
  63. Boost
  64. Qt
  65. QThread
  66. Thread pools
  67. Synchronization
  68. QtConcurrent
  69. Thread local storage
  70. POCO
  71. Thread class
  72. Thread pool
  73. Thread local storage (TLS)
  74. Synchronization
  75. C++ threads
  76. Putting it together
  77. Summary
  78. Thread Synchronization and Communication
  79. Safety first
  80. The scheduler
  81. High-level view
  82. Implementation
  83. Request class
  84. Worker class
  85. Dispatcher
  86. Makefile
  87. Output
  88. Sharing data
  89. Using r/w-locks
  90. Using shared pointers
  91. Summary
  92. Native C++ Threads and Primitives
  93. The STL threading API
  94. Boost.Thread API
  95. The 2011 standard
  96. C++14
  97. C++17
  98. STL organization
  99. Thread class
  100. Basic use
  101. Passing parameters
  102. Return value
  103. Moving threads
  104. Thread ID
  105. Sleeping
  106. Yield
  107. Detach
  108. Swap
  109. Mutex
  110. Basic use
  111. Non-blocking locking
  112. Timed mutex
  113. Lock guard
  114. Unique lock
  115. Scoped lock
  116. Recursive mutex
  117. Recursive timed mutex
  118. Shared mutex
  119. Shared timed mutex
  120. Condition variable
  121. Condition_variable_any
  122. Notify all at thread exit
  123. Future
  124. Promise
  125. Shared future
  126. Packaged_task
  127. Async
  128. Launch policy
  129. Atomics
  130. Summary
  131. Debugging Multithreaded Code
  132. When to start debugging
  133. The humble debugger
  134. GDB
  135. Debugging multithreaded code
  136. Breakpoints
  137. Back traces
  138. Dynamic analysis tools
  139. Limitations
  140. Alternatives
  141. Memcheck
  142. Basic use
  143. Error types
  144. Illegal read / illegal write errors
  145. Use of uninitialized values
  146. Uninitialized or unaddressable system call values
  147. Illegal frees
  148. Mismatched deallocation
  149. Overlapping source and destination
  150. Fishy argument values
  151. Memory leak detection
  152. Helgrind
  153. Basic use
  154. Misuse of the pthreads API
  155. Lock order problems
  156. Data races
  157. DRD
  158. Basic use
  159. Features
  160. C++11 threads support
  161. Summary
  162. Best Practices
  163. Proper multithreading
  164. Wrongful expectations - deadlocks
  165. Being careless - data races
  166. Mutexes aren't magic
  167. Locks are fancy mutexes
  168. Threads versus the future
  169. Static order of initialization
  170. Summary
  171. Atomic Operations - Working with the Hardware
  172. Atomic operations
  173. Visual C++
  174. GCC
  175. Memory order
  176. Other compilers
  177. C++11 atomics
  178. Example
  179. Non-class functions
  180. Example
  181. Atomic flag
  182. Memory order
  183. Relaxed ordering
  184. Release-acquire ordering
  185. Release-consume ordering
  186. Sequentially-consistent ordering
  187. Volatile keyword
  188. Summary
  189. Multithreading with Distributed Computing
  190. Distributed computing, in a nutshell
  191. MPI
  192. Implementations
  193. Using MPI
  194. Compiling MPI applications
  195. The cluster hardware
  196. Installing Open MPI
  197. Linux and BSDs
  198. Windows
  199. Distributing jobs across nodes
  200. Setting up an MPI node
  201. Creating the MPI host file
  202. Running the job
  203. Using a cluster scheduler
  204. MPI communication
  205. MPI data types
  206. Custom types
  207. Basic communication
  208. Advanced communication
  209. Broadcasting
  210. Scattering and gathering
  211. MPI versus threads
  212. Potential issues
  213. Summary
  214. Multithreading with GPGPU
  215. The GPGPU processing model
  216. Implementations
  217. OpenCL
  218. Common OpenCL applications
  219. OpenCL versions
  220. OpenCL 1.0
  221. OpenCL 1.1
  222. OpenCL 1.2
  223. OpenCL 2.0
  224. OpenCL 2.1
  225. OpenCL 2.2
  226. Setting up a development environment
  227. Linux
  228. Windows
  229. OS X/MacOS
  230. A basic OpenCL application
  231. GPU memory management
  232. GPGPU and multithreading
  233. Latency
  234. Potential issues
  235. Debugging GPGPU applications
  236. Summary

Dispatcher

As the last item, we have the Dispatcher class itself:

    #pragma once
#ifndef DISPATCHER_H
#define DISPATCHER_H

#include "abstract_request.h"
#include "worker.h"

#include <queue>
#include <mutex>
#include <thread>
#include <vector>

using namespace std;

class Dispatcher {
static queue<AbstractRequest*> requests;
static queue<Worker*> workers;
static mutex requestsMutex;
static mutex workersMutex;
static vector<Worker*> allWorkers;
static vector<thread*> threads;
public:
static bool init(int workers);
static bool stop();
static void addRequest(AbstractRequest* request);
static bool addWorker(Worker* worker);
};
#endif

Most of this will look familiar. As you will have surmised by now, this is a fully static class.

Moving on, its implementation is as follows:

    #include "dispatcher.h"

#include <iostream>
using namespace std;

queue<AbstractRequest*> Dispatcher::requests;
queue<Worker*> Dispatcher::workers;
mutex Dispatcher::requestsMutex;
mutex Dispatcher::workersMutex;
vector<Worker*> Dispatcher::allWorkers;
vector<thread*> Dispatcher::threads;

bool Dispatcher::init(int workers) {
thread* t = 0;
Worker* w = 0;
for (int i = 0; i < workers; ++i) {
w = new Worker;
allWorkers.push_back(w);
t = new thread(&Worker::run, w);
threads.push_back(t);
}
return true;
}

After setting up the static class members, the init() function is defined. It starts the specified number of worker threads keeping a reference to each worker and thread instance in their respective vector data structures:

    bool Dispatcher::stop() {
for (int i = 0; i < allWorkers.size(); ++i) {
allWorkers[i]->stop();
}
cout << "Stopped workers.\n";
for (int j = 0; j < threads.size(); ++j) {
threads[j]->join();
cout << "Joined threads.\n";
}
}

In the stop() function, each worker instance has its stop() function called. This will cause each worker thread to terminate, as we saw earlier in the Worker class description.

Finally, we wait for each thread to join (that is, finish) prior to returning:

    void Dispatcher::addRequest(AbstractRequest* request) {
workersMutex.lock();
if (!workers.empty()) {
Worker* worker = workers.front();
worker->setRequest(request);
condition_variable* cv;
worker->getCondition(cv);
cv->notify_one();
workers.pop();
workersMutex.unlock();
}
else {
workersMutex.unlock();
requestsMutex.lock();
requests.push(request);
requestsMutex.unlock();
}
}

The addRequest() function is where things get interesting. In this function, a new request is added. What happens next depends on whether a worker thread is waiting for a new request or not. If no worker thread is waiting (worker queue is empty), the request is added to the request queue.

The use of mutexes ensures that the access to these queues occurs safely, as the worker threads will simultaneously try to access both queues as well.

An important gotcha to note here is the possibility of a deadlock. That is, a situation where two threads will hold the lock on a resource, with the second thread waiting for the first one to release its lock before releasing its own. Every situation where more than one mutex is used in a single scope holds this potential.

In this function, the potential for a deadlock lies in releasing of the lock on the workers mutex, and when the lock on the requests mutex is obtained. In the case that this function holds the workers mutex and tries to obtain the requests lock (when no worker thread is available), there is a chance that another thread holds the requests mutex (looking for new requests to handle) while simultaneously trying to obtain the workers mutex (finding no requests and adding itself to the workers queue).

The solution here is simple: release a mutex before obtaining the next one. In the situation where one feels that more than one mutex lock has to be held, it is paramount to examine and test one's code for potential deadlocks. In this particular situation, the workers mutex lock is explicitly released when it is no longer needed, or before the requests mutex lock is obtained, thus preventing a deadlock.

Another important aspect of this particular section of code is the way it signals a worker thread. As one can see in the first section of the if/else block, when the workers queue is not empty, a worker is fetched from the queue, has the request set on it, and then has its condition variable referenced and signaled, or notified.

Internally, the condition variable uses the mutex we handed it before in the Worker class definition to guarantee only atomic access to it. When the notify_one() function (generally called signal() in other APIs) is called on the condition variable, it will notify the first thread in the queue of threads waiting for the condition variable to return and continue.

In the Worker class run() function, we would be waiting for this notification event. Upon receiving it, the worker thread would continue and process the new request. The thread reference will then be removed from the queue until it adds itself again once it is done processing the request:

    bool Dispatcher::addWorker(Worker* worker) {
bool wait = true;
requestsMutex.lock();
if (!requests.empty()) {
AbstractRequest* request = requests.front();
worker->setRequest(request);
requests.pop();
wait = false;
requestsMutex.unlock();
}
else {
requestsMutex.unlock();
workersMutex.lock();
workers.push(worker);
workersMutex.unlock();
}
return wait;
}

With this last function, a worker thread will add itself to the queue once it is done processing a request. It is similar to the earlier function in that the incoming worker is first actively matched with any request which may be waiting in the request queue. If none are available, the worker is added to the worker queue.

It is important to note here that we return a Boolean value which indicates whether the calling thread should wait for a new request, or whether it already has received a new request while trying to add itself to the queue.

While this code is less complex than that of the previous function, it still holds the same potential deadlock issue due to the handling of two mutexes within the same scope. Here, too, we first release the mutex we hold before obtaining the next one.