Chapter 14. Multithreading

C programs often perform several tasks simultaneously. For example, a program may:

  • Execute procedures that accomplish intermediate tasks in parallel and so improve performance

  • Process user input while carrying on time-consuming data communication or real-time operations “in the background”

Different tasks are performed simultaneously by the concurrent execution of parts of the program. Especially on modern multiprocessor systems—including multicore processors, of course—it is increasingly important for programs to take advantage of concurrency to use the system’s resources efficiently.

Until recently, C developers have had to depend on features of the operating system or appropriate libraries to implement concurrent execution. Now, however, the new C11 standard makes concurrency in C programming portable. C11 supports multithreaded execution, or multiple parallel paths of control flow within a process, and provides the same degree of concurrency as all modern operating systems. To this end, C11 defines an appropriate memory model and supports atomic operations. Support for multithreading and atomic operations are optional under the C11 standard, however. An implementation that conforms to C11 must simply define the macros __STDC_NO_THREADS__ and __STDC_NO_ATOMICS__ if it does not provide the corresponding features.

You may have already worked with the POSIX threads extension to C (called pthreads for short); that is, the library that implements multithreading in accordance with the Portable Operating System Interface for UNIX (POSIX) standard, IEEE 1003.1c. If so, you will find that the C11 threads programming interface is similar in most respects to the POSIX standard.

Threads

When you start a program, the operating system creates a new process in which the program is executed. A process consists of one or more threads. Each thread is a partial process that executes a sequence of instructions independently of other parts of the process. When the process begins, its main thread is active. From then on, any running thread can launch other threads. All threads that have been started but not yet ended are terminated when the process terminates—for example, by executing a return statement in the main() function or by calling the exit() function.

The system’s scheduler allocates the available CPU time to all runnable threads equally. Usually the scheduler is preemptive: that means it interrupts the thread being executed by a central processing unit (CPU) at brief intervals and assigns the CPU a different thread for a time. As a result, threads appear to the user to be executed in parallel, even on a single-processor system. Truly simultaneous execution of several threads is only possible on a multiprocessor system, however.

Every process has its own address space in memory, and has other exclusive resources, such as open files. All the threads of a process inherit its resources. Most significantly, several threads in one process share the same address space. That makes task-switching within a process much simpler for the scheduler than switching to a different process.

However, each thread also has resources of its own that are necessary for task-switching between threads: these include stack memory and CPU registers. These allow each thread to process its own local data without interference between threads. In addition, a thread may also have thread-specific permanent memory (see “Thread-Local Objects and Thread-Specific Storage”).

Because the threads of a given process use the same address space, they share their global and static data. That means, however, that two different threads can access the same memory locations concurrently. This situation is called a data race in the C standard, or a race condition in popular parlance. To prevent inconsistencies in shared data, the programmer must explicitly synchronize different threads’ writing operations or reading and writing operations if they use the same locations in memory.

Creating Threads

The macro definitions and the declarations of types and functions to support multithreading are declared in the header threads.h. All of the identifiers that are directly related to threads begin with the prefix thrd_. For example, thrd_t is the type of an object that identifies a thread.

The function that creates and starts executing a new thread is called thrd_create(). One of its arguments names the function to be executed in the new thread. The complete prototype of thrd_create() is:

int thrd_create(thrd_t *thr, thrd_start_t func, void *arg);

The parameter func is a pointer to the function that the thread will execute, and the void pointer arg is used to pass an argument to that function. In other words, the new thread will perform the function call func(arg). The type of the func argument, thrd_start_t, is defined as int (*)(void*) (that is, a pointer to a function that takes a void pointer as its argument and returns an int), so the function that the thread carries out returns a value of the type int. The program can subsequently obtain this return value—waiting for the thread to finish if necessary—by calling the function thread_join().

If it succeeds in starting a thread, the function thread_create() writes the identification of the new thread in the object pointed to by the argument thr, and returns the value of the macro thread_success.

In most cases, other operations later in the program depend on the results of the thread’s execution and can only be performed when it has finished. The function thread_join() is used to ensure that a thread has finished. Its prototype is:

int thrd_join(thrd_t thr, int *result);

The thread that calls thread_join() blocks—that is, it stops at that point in the program as long as necessary—until the thread identified by thr finishes. Then thread_join() writes the return value of that thread’s function in the int variable that the pointer result refers to (unless result is a null pointer). Finally, thread_join() releases any resources that belong to the thread.

If the program’s logic does not require it to wait for a thread to end, it should call the function:

int thrd_detach(thrd_t thr);

Then all of the thread’s resources will be released when the thread finishes. Once a thread has been detached, there is no way for the program to wait for it to end, nor to obtain the return value of the thread function. A program can call either thread_join() or thread_detach() no more than once for each thread created.

The program in Example 14-1 illustrates a way of processing an array using parallel operations. Separate threads first process parts of the array, and then their results are joined together. The program merely calculates the sum of a sequence of numbers.

The function sum() first determines the maximum size of a block of array elements from the number of threads to be created, and then calls the recursive helper function parallel_sum().

The parallel_sum() function divides the array into two halves and gives one half to a new thread to work on, and then calls itself to process the other half. As the example illustrates, several arguments needed by a thread function are generally grouped in a structure.

Example 14-1. Calculating the sum of array elements in several parallel threads
#include <stdbool.h>
#include <threads.h>

#define MAX_THREADS 8       // 1, 2, 4, 8 ... Maximum number
                            // of threads to create.
#define MIN_BLOCK_SIZE 100  // Minimum size of an array block.

typedef struct              // Arguments for the parallel_sum() function.
{
    float *start;           // Start and length of the
    int len;                // array block passed to parallel_sum().
    int block_size;         // Size of the smallest blocks.
    double sum;             // The result.
} Sum_arg;

int parallel_sum(void *arg);     // Prototype of the thread function.

// ---------------------------------------------------------------
// Calculate the sum of array elements and write it to *sumPtr.
// sum() calls the function parallel_sum() for parallel processing.
// Return value: true if no error occurs; otherwise, false.
bool sum(float arr[], int len, double* sumPtr)
{
    int block_size = len / MAX_THREADS;
    if (block_size < MIN_BLOCK_SIZE) block_size = len;

    Sum_arg args = { arr, len, block_size, 0.0 };
    if (parallel_sum(&args))
    {  *sumPtr = args.sum;   return true; }
    else
       return false;
}
// ---------------------------------------------------------------
// Recursive helper function to divide the work among several threads.
int parallel_sum(void *arg)
{
    Sum_arg *argp = (Sum_arg*)arg;      // A pointer to the arguments.

    if (argp->len <= argp->block_size)  // If length <= block_size,
    {                                   // add up the elements.
        for (int i = 0; i < argp->len; ++i)
            argp->sum += argp->start[i];
        return 1;
    }
    else                               // If length > block_size,
    {                                  // divide the array.
        int mid = argp->len / 2;
        Sum_arg arg2 = { argp->start+mid, argp->len-mid,
                         argp->block_size, 0};   // Specifies second half
        argp->len = mid;                         // Length of first half


        thrd_t th;                // Process first half in a new thread.
        int res = 0;
        if (thrd_create(&th, parallel_sum, arg) != thrd_success)
            return 0;             // Couldn't spawn a thread

        if (!parallel_sum(&arg2)) // Process second half by recursion
                                  // in the current thread.
        {
            thrd_detach(th); return 0;  // Recursive call failed
        }
        thrd_join(th, &res);
        if (!res)
           return 0;  // Sibling thread reported failure

        argp->sum += arg2.sum;
        return 1;
    }
}

Other Thread Functions

In addition to the thread_create(), thread_join() and thread_detach() functions described in the previous section, C11 provides five more functions for thread control:

thrd_t thrd_current(void);

This function returns the identification of the thread in which it is called.

int thrd_equal( thrd_t thr0, thrd_t thr1 );

Returns 0 if and only if the two thread identifiers refer to different threads.

int thrd_sleep( const struct timespec *duration,
                struct timespec *
remaining );

Blocks the calling thread for the period specified by duration. The function returns earlier only if it receives a signal that is not being ignored (see “Signals”). In that case, the function saves the remaining countdown time in the object pointed to by remaining, provided remaining is not a null pointer. The pointers duration and remaining must not point to the same object.

The structure argument timespec has two members for storing seconds and nanoseconds:

time_t tv_sec;     // Seconds >= 0
long   tv_nsec;    // 0 <= nanoseconds <= 999999999

The order of the members in the structure is not specified. In the following example, the calling thread waits for at least 100 milliseconds unless interrupted by a signal:

struct timespec duration = {0};
duration.tv_nsec = 100*1E6;  // 1 millisecond
                             // = 1,000,000 nanoseconds
thrd_sleep(&duration,NULL);  // Sleep for 100 milliseconds.

The function thrd_sleep() returns 0 if the countdown has expired, or –1 if it was interrupted by a signal. Other negative return values indicate errors.

void thrd_yield(void);

This function advises the operating system’s scheduler to interrupt the calling thread and give CPU time to another thread.

_Noreturn void thrd_exit(int result);

Ends the calling thread with the result result. Any function executed in the thread may call thrd_exit(). This function call is equivalent to the statement return result; in the thread function. Exiting the last remaining thread causes the program to exit normally; that is, as if the exit() function were called with the argument EXIT_SUCCESS.

Accessing Shared Data

If several threads access the same data and at least one of them modifies it, then all access to the shared data must be synchronized in order to prevent data races. Otherwise, a thread that reads shared data could interrupt another thread that is in the middle of modifying the same data, and would then read inconsistent values. Moreover, because the system may schedule the threads differently each time a program is executed, such errors only manifest themselves intermittently in running programs and are difficult to reproduce in testing. As the program in Example 14-2 illustrates, a data race can occur even in such a simple operation as incrementing a counter.

Example 14-2. Concurrent memory access without synchronization
#include <stdio.h>
#include <threads.h>

#define COUNT 10000000L

long counter = 0;
void incFunc(void) {  for (long i = 0; i < COUNT; ++i)  ++counter; }
void decFunc(void) {  for (long i = 0; i < COUNT; ++i)  --counter; }

int main(void)
{
    clock_t cl = clock();
    thrd_t th1, th2;
    if (thrd_create(&th1, (thrd_start_t)incFunc, NULL) != thrd_success
      || thrd_create(&th2, (thrd_start_t)decFunc, NULL) != thrd_success)
    {
        fprintf(stderr,"Error creating thread\n"); return -1;
    }
    thrd_join(th1, NULL);
    thrd_join(th2, NULL);

    printf("Counter: %ld \t", counter);
    printf("CPU time: %ld ms\n", (clock()-cl)*1000L/CLOCKS_PER_SEC);
    return 0;
}

The counter should be 0 when the program ends. However, without synchronization, that is not the case: the final counter value is different each time the program runs. Here is a typical output sample:

Counter: -714573        CPU time: 59 ms

To permit synchronization, the C library provides mutex operations and atomic operations.

Mutual Exclusion

The technique of mutual exclusion, or mutex for short, is used to prevent several threads from accessing shared resources at the same time. The name mutex is given to an object used to control exclusive access authorization. Together with condition variables, mutexes permit extensive control of synchronized access. For example, they allow you to specify the order in which data access operations must occur.

In C programs, a mutex is represented by an object of the type mtx_t that can be locked by only one thread at a time, while other threads must wait until it is unlocked. All of the declarations pertaining to operations on mutexes are contained in the header threads.h. The most important mutex functions are:

int mtx_init(mtx_t *mtx, int mutextype);

Creates a mutex with the properties specified by mutextype. If it succeeds in creating a new mutex, the function mtx_init() writes the ID of the new mutex in the object pointed to by the argument mtx, and returns the value of the macro thrd_success.

The argument mutextype can have one of the following four values:

mtx_plain
mtx_timed
mtx_plain | mtx_recursive
mtx_timed | mtx_recursive

The value mtx_plain requests a simple mutex that supports neither timeouts nor recursion; the other values specify timeout and/or recursion support.

void mtx_destroy(mtx_t *mtx);

Destroys the mutex pointed to by mtx, releasing all its resources.

int mtx_lock(mtx_t *mtx);

Blocks the calling thread until it obtains the mutex specified by mtx. The calling thread must not already hold the mutex unless the mutex supports recursion. If the call succeeds in obtaining the mutex, it returns the value of thrd_success. Otherwise, it returns thrd_error.

int mtx_unlock(mtx_t *mtx);

Releases the mutex referred to by mtx. The caller must hold the mutex before calling mtx_unlock(). If the call succeeds in releasing the mutex, it returns the value of thrd_success. Otherwise, it returns thrd_error.

The complementary functions mtx_lock() and mtx_unlock() are called at the beginning and end of a critical section of code which only one thread at a time must execute. Two alternatives to mtx_lock() are the functions mtx_trylock(), which obtains the mutex if it happens to be free but doesn’t block if it is not, and mtx_timedlock(), which only blocks until a specified time. All of these functions indicate by their return value whether the call succeeded in obtaining the mutex.

The program in Example 14-3 is a modification of Example 14-2 and shows how to use a mutex to eliminate the data race for the variable counter.

Example 14-3. Adding a mutex to the program in Example 14-2
#include <stdio.h>
#include <threads.h>

#define COUNT 10000000L

long counter = 0;
mtx_t mtx;                     // A mutex for access to counter

void incFunc(void)
{
    for (long i = 0; i < COUNT; ++i)
    {  mtx_lock(&mtx);  ++counter;  mtx_unlock(&mtx); }
}
void decFunc(void)
{
    for (long i = 0; i < COUNT; ++i)
    {  mtx_lock(&mtx);  --counter;  mtx_unlock(&mtx); }
}

int main(void)
{
    if (mtx_init(&mtx, mtx_plain) != thrd_success)
    {
        fprintf(stderr, "Error initializing the mutex.\n");
        return -1;
    }
    //
    // As in Example 14-2: start threads, wait for them to finish,
    // print output.
    //
    mtx_destroy(&mtx);
    return 0;
}

The functions incFunc() and decFunc() can no longer access counter concurrently, as only one of them can lock the mutex at a time. (Error checking has been omitted for the sake of readability.) Now the counter has the correct value, 0, at the end of the program. Here is a typical output sample:

Counter: 0        CPU time: 650 ms

Synchronization works, but at a price. The higher CPU time shows that the program now takes about ten times as long to run. The reason is that synchronization by locking a mutex is a much more complex operation than incrementing and decrementing a variable. Better performance can be achieved using atomic objects in cases where they obviate the need for a mutex lock.

Atomic Objects

An atomic object is an object that can be read or modified by means of atomic operations; that is, by operations that cannot be interrupted by a concurrent thread. You can declare an atomic object using the type qualifier _Atomic, introduced in C11 (unless the implementation defines the macro __STDC_NO_ATOMICS__). For example, the counter variable in the program in Example 14-2 can be made atomic by declaring it as follows:

_Atomic long counter = ATOMIC_VAR_INIT(0L);

This declaration defines the atomic long variable counter and initializes it with the value 0. The macro ATOMIC_VAR_INIT and all the other macros, types, and declarations for using atomic objects are found in the header stdatomic.h. In particular, stdatomic.h defines abbreviations for atomic types corresponding to all the integer types. For example, the type atomic_uchar is equivalent to _Atomic unsigned char.

The syntax _Atomic(T) can also be used to specify the atomic type corresponding to a given non-atomic type T. Array and function types cannot be atomic, however. An atomic type may have a different size and alignment from those of the corresponding non-atomic type.

Atomic Operations

Reading or writing an atomic object is an atomic operation; that is, an operation that cannot be interrupted. That means that different threads can access an atomic object concurrently without causing a race condition. For every atomic object, all modifications of the object are performed in a definite global order, which is called its modification order.

An atomic object with a structure or union type should only be read or written as a whole: for safe access to individual members, the atomic structure or union should first be copied to an equivalent non-atomic object.

Note that the initialization of an atomic object, whether using the macro ATOMIC_VAR_INIT or by the generic function atomic_init(), is not an atomic operation.

Atomic operations are typically carried out as read-modify-write operations. For example, the postfix increment and decrement operators ++ and --, when applied to an atomic object, are atomic read-modify-write operations. Likewise, the compound assignment operators, such as +=, work atomically when their left operand is an atomic object. The program in Example 14-2 can be made to deliver the correct final counter value 0, without any other changes, by declaring the variable counter as atomic. The program’s timekeeping shows that the version with an atomic counter variable is more than twice as fast as the version using a mutex in Example 14-3.

In addition to the operators already mentioned, there are a number of functions to perform atomic operations, including atomic_store(), atomic_exchange(), and atomic_compare_exchange_strong(). You will find an overview of this group of functions in Chapter 17, and a detailed description of each one in Chapter 18.

An atomic type has the lock-free property if atomic access to an object of this type can be realized without using lock and unlock operations. Only the type atomic_flag, a structure type that can represent the two states “set” and “cleared”, is guaranteed to have the lock-free property. The macro ATOMIC_FLAG_INIT initializes an atomic_flag object in the “cleared” state, as in the following declaration, for example:

atomic_flag done = ATOMIC_FLAG_INIT;

To perform the customary flag operations on an atomic_flag object, C11 provides the functions atomic_flag_test_and_set() and atomic_flag_clear(). The integer atomic types are usually also lock-free. To determine whether a given type is actually lock-free, a program can check the value of a macro of the form ATOMIC_type_LOCK_FREE, where type is a capitalized abbreviation for a specific integer type, such as BOOL, INT, or LLONG. The corresponding macro for pointer types is ATOMIC_POINTER_LOCK_FREE. All of these macros yield values of 0, 1, or 2. The value 0 means that the type is never lock-free; 1 means it is lock-free for certain objects; and 2 means it is always lock-free. Alternatively, you can find out whether a given atomic object is lock-free by calling the generic function:

_Bool atomic_is_lock_free(const volatile A *obj);

The placeholder A in the function’s parameter declaration stands for any atomic type. The argument obj is thus a pointer to any given atomic object.

Memory Ordering

In optimizing program code, compilers and processors are free to rearrange the order of any instructions that are not interdependent. For example, the two assignment statements a = 0; b = 1; can be executed in either order. In a multithreading environment, however, such optimizations can lead to errors, because dependencies between memory operations in different threads are ordinarily not visible to the compiler or processor.

Using atomic objects prevents such reordering by default. Preventing an optimization may mean sacrificing speed, however. Experienced programmers can improve performance by explicitly using atomic operations with lower memory-ordering requirements. For each function that performs an atomic operation (such as atomic_store(), for example), there is also a version that takes an additional argument of the type memory_order. These functions have names that end in _explicit, such as atomic_store_explicit().

The memory_order type is an enumeration that defines the following constants to specify the given memory ordering requirements:

memory_order_relaxed

The caller specifies that there are no memory order requirements, so that the compiler is free to change the order of operations.

memory_order_release

Write access to an atomic object A performs a release operation. The effect of the release operation is that all the preceding memory access operations in the given thread are visible to another thread that performs an acquire operation on A.

memory_order_acquire

A read operation on an atomic object performs an acquire operation. That ensures that subsequent memory access operations are not rearranged to occur before this function call.

memory_order_consume

A consume operation is less restrictive than an acquire operation: it prevents the reordering only of subsequent memory access operations that depend directly on the atomic variable read.

memory_order_acq_rel

Performs both an acquire and a release operation.

memory_order_seq_cst

The request for sequential consistency includes the acquire and release operations of memory_order_acq_rel. In addition, it also specifies that all operations that are so qualified are performed in an absolute order that conforms to the modification order of the atomic objects involved. Sequential consistency is the default memory order requirement that is applied to all atomic operations if no lower requirement is explicitly specified.

In the program in Example 14-2, modified to declare counter as atomic, the incrementation and decrementation of the counter are performed independently of other operations so that no memory order specifications are necessary. In other words, in place of the statement

++counter;            // Implies memory_order_seq_cst

the following statement is sufficient, and allows the compiler to perform more optimization:

atomic_fetch_add_explicit( &counter, 1, memory_order_relaxed );

Release and acquire operations are an efficient way to establish a happens-before relation between instructions. In other words, as the following example illustrates, the _explicit functions ensure that a given operation B is only executed after another thread has completed an operation A:

struct Data *dp = NULL, data;
atomic_intptr_t aptr = ATOMIC_VAR_INIT(0);

// Thread 1:
   data = ...;                        // Operation A
   atomic_store_explicit( &aptr, (intptr_t)&data,
                          memory_order_release );

// Thread 2:
   dp = (struct Data*)atomic_load_explicit( &aptr,
                                            memory_order_acquire );
   if( dp != NULL)
      // Process the data at *dp
                                      // Operation B
   else
      // Data at *dp not available yet.

Synchronization using a mutex also implies an acquire operation when the mutex is locked, and a release operation when it is unlocked. That means that if a thread T1 uses a mutex to protect an operation A, and another thread T2 uses the same mutex to protect an operation B, then operation A will be executed completely before operation B if T1 locks the mutex first. Conversely, if T2 locks the mutex first, then all the modifications performed by operation B will be visible to thread T1 when T1 executes operation A.

Fences

The memory order requirements for an atomic operation can also be specified separately from an atomic operation. This technique is called establishing a fence or memory barrier. To set a fence, C11 provides the function:

void atomic_thread_fence(memory_order order);

If the argument’s value is memory_order_release, the function establishes a release fence. In this case, the atomic write operations must occur after the release fence.

The atomic_thread_fence() function establishes an acquire fence if its argument’s value is memory_order_acquire or memory_order_consume. The atomic read operations must occur before the acquire fence.

If the argument’s value is memory_order_relaxed, the function has no effect. The argument values memory_order_acq_rel and memory_order_seq_cst specify a release and acquire fence.

Fences permit a greater degree of memory-order optimization. In our previous example, an acquire operation in the if branch is sufficient to synchronize the thread operations:

// Thread 2:
   dp = (struct Data*)atomic_load_explicit( &aptr,
                                            memory_order_relaxed );
   if( dp != NULL)
   {
      atomic_thread_fence(memory_order_acquire);
      // Operation B:
      // Process the data at *dp.
   }
   else
      // Data at *dp not available yet.

Communication Between Threads: Condition Variables

The C11 standard provides condition variables for communication between threads. Threads can use condition variables to wait for a notification from another thread indicating that a certain condition is fulfilled. Such a notification may mean that certain data are ready for processing, for example.

A condition variable is represented by an object of the type cnd_t, and is used in conjunction with a mutex. The general procedure is as follows: The thread obtains the mutex and tests the condition. If the condition is not fulfilled, the thread waits on the condition variable—releasing the mutex—until another thread wakes it up. Then the thread obtains the mutex and tests the condition again. This procedure is repeated until the condition is fulfilled.

The functions for working with condition variables, declared in the header threads.h, are as follows:

int cnd_init(cnd_t *cond);

Initializes the condition variable pointed to by cond.

void cnd_destroy(cnd_t *cond);

Frees all the resources used by the specified condition variable.

int cnd_signal(cnd_t *cond);

Wakes up one of any number of threads that are waiting for the specified condition variable.

int cnd_broadcast(cnd_t *cond);

Wakes up all the threads waiting for the specified condition variable.

int cnd_wait(cnd_t *cond, mtx_t *mtx);

Blocks the calling thread and releases the specified mutex. A thread must hold the mutex before calling cnd_wait(). If another thread unblocks the caller by sending a signal—that is, by specifying the same condition variable as the argument to a cnd_signal() or cnd_broadcast() call—then the thread that has called cnd_wait() obtains the mutex again before cnd_wait() returns.

int cnd_timedwait(cnd_t *restrict cond, mtx_t *restrict mtx,
                                     const struct timespec *restrict ts);

Like cnd_wait(), cnd_timedwait() blocks the thread that calls it, but only until the time specified by the argument ts. A struct timespec object representing the current time can be obtained by calling the function timespec_get().

All of the condition variable functions except cnd_destroy() return the value of thrd_error if they incur an error, and otherwise thrd_success. The function cnd_timedwait() can also return the value of thrd_timedout if it returns when the time limit has been reached.

The program in Examples 14-4 and 14-5 illustrates the use of condition variables in the common “producer-consumer” model. The program starts a new thread for each producer and for each consumer. A producer puts a new product—in our case, an int value—in a ring buffer, provided the buffer is not full, and signals waiting consumers that a product is available. Each consumer takes products from the buffer, if available, and signals the fact to waiting producers.

Only one thread can modify the ring buffer at any given time. Thread synchronization therefore takes place in the functions bufPut(), which inserts an element in the buffer, and bufGet(), which removes an element from it. There are two condition variables: a producer waits on one of them if the buffer is full, and a consumer waits on the other if the buffer is empty. All the necessary elements of the buffer are contained in the structure Buffer. The bufInit() function initializes a Buffer object with a specified size, and the bufDestroy() function destroys it.

Example 14-4. A ring buffer for the producer-consumer model
/* buffer.h
 * Declarations for a thread-safe buffer.
 */
#include <stdbool.h>
#include <threads.h>

typedef struct Buffer
{
    int *data;              // Pointer to the array of data.
    size_t size, count;     // Maximum and current numbers of elements.
    size_t tip, tail;       // tip = index of the next free spot.
    mtx_t mtx;              // A mutex and
    cnd_t cndPut, cndGet;   // two condition variables.
} Buffer;

bool bufInit( Buffer *bufPtr, size_t size );
void bufDestroy(Buffer *bufPtr);

bool bufPut(Buffer *bufPtr, int data);
bool bufGet(Buffer *bufPtr, int *dataPtr, int sec);

/* -------------------------------------------------------------
 * buffer.c
 * Definitions of functions operating on Buffer.
 */
#include "buffer.h"
#include <stdlib.h>          // For malloc() and free()

bool bufInit( Buffer *bufPtr, size_t size)
{
    if ((bufPtr->data = malloc( size * sizeof(int))) == NULL)
       return false;
    bufPtr->size = size;
    bufPtr->count = 0;
    bufPtr->tip = bufPtr->tail = 0;
    return    mtx_init( &bufPtr->mtx, mtx_plain) == thrd_success
           && cnd_init( &bufPtr->cndPut) == thrd_success
           && cnd_init( &bufPtr->cndGet) == thrd_success;
}

void bufDestroy(Buffer *bufPtr)
{
    cnd_destroy( &bufPtr->cndGet );
    cnd_destroy( &bufPtr->cndPut );
    mtx_destroy( &bufPtr->mtx );
    free( bufPtr->data );
}

// Insert a new element in the buffer:
bool bufPut(Buffer *bufPtr, int data)
{
    mtx_lock( &bufPtr->mtx );

    while (bufPtr->count == bufPtr->size)
       if (cnd_wait( &bufPtr->cndPut, &bufPtr->mtx ) != thrd_success)
          return false;

    bufPtr->data[bufPtr->tip] = data;
    bufPtr->tip = (bufPtr->tip + 1) % bufPtr->size;
    ++bufPtr->count;

    mtx_unlock( &bufPtr->mtx );
    cnd_signal( &bufPtr->cndGet );

    return true;
}

// Remove an element from the buffer. If the buffer is empty,
// wait no more than sec seconds.
bool bufGet(Buffer *bufPtr, int *dataPtr, int sec)
{
    struct timespec ts;
    timespec_get( &ts, TIME_UTC );   // The current time
    ts.tv_sec  += sec;               // + sec seconds delay.

    mtx_lock( &bufPtr->mtx );

    while ( bufPtr->count == 0 )
       if (cnd_timedwait(&bufPtr->cndGet,
                         &bufPtr->mtx, &ts) != thrd_success)
          return false;

    *dataPtr = bufPtr->data[bufPtr->tail];
    bufPtr->tail = (bufPtr->tail + 1) % bufPtr->size;
    --bufPtr->count;

    mtx_unlock( &bufPtr->mtx );
    cnd_signal( &bufPtr->cndPut );

    return true;
}

The corresponding main() function, shown in Example 14-5, creates a buffer and starts several producer and consumer threads, giving each of them an identification number and a pointer to the buffer. Each producer thread creates a certain number of “products” and then quits with a return statement. A consumer thread returns if it is unable to get a product to consume within a certain delay.

Example 14-5. Starting the producer and consumer threads
// producer_consumer.c
#include "buffer.h"
#include <stdio.h>
#include <stdlib.h>

#define NP 2                        // Number of producers
#define NC 3                        // Number of consumers

int producer(void *);               // The thread functions.
int consumer(void *);

struct Arg { int id; Buffer *bufPtr; };  // Arguments for the
                                         // thread functions.
_Noreturn void errorExit(const char* msg)
{
    fprintf(stderr, "%s\n", msg); exit(0xff);
}

int main(void)
{
    printf("Producer-Consumer Demo\n\n");
    Buffer buf;                          // Create a buffer for
    bufInit( &buf, 5 );                  // five products.

    thrd_t prod[NP], cons[NC];           // The threads and
    struct Arg prodArg[NP], consArg[NC]; // their arguments.
    int i = 0, res = 0;

    for ( i = 0; i < NP; ++i )           // Start the producers.
    {
       prodArg[i].id = i+1, prodArg[i].bufPtr = &buf;
       if (thrd_create( &prod[i], producer, &prodArg[i] ) != thrd_success)
          errorExit("Thread error.");
    }

    for ( i = 0; i < NC; ++i )           // Start the consumers.
    {
       consArg[i].id = i+1, consArg[i].bufPtr = &buf;
       if ( thrd_create( &cons[i], consumer, &consArg[i] ) != thrd_success)
          errorExit("Thread error.");
    }

    for ( i = 0; i < NP; ++i )    // Wait for the threads to finish.
      thrd_join(prod[i], &res),
      printf("\nProducer %d ended with result %d.\n", prodArg[i].id, res);

    for ( i = 0; i < NC; ++i )
       thrd_join(cons[i], &res),
       printf("Consumer %d ended with result %d.\n", consArg[i].id, res);

    bufDestroy( &buf );
    return 0;
}

int producer(void *arg)       // The producers' thread function.
{
    struct Arg *argPtr = (struct Arg *)arg;
    int id = argPtr->id;
    Buffer *bufPtr = argPtr->bufPtr;

    int count = 0;
    for (int i = 0; i < 10; ++i)
    {
        int data = 10*id + i;
        if (bufPut( bufPtr, data ))
            printf("Producer %d produced %d\n", id, data), ++count;
        else
        { fprintf( stderr,
                 "Producer %d: error storing %d\n", id, data);
          return -id;
        }
    }
    return count;
}

int consumer(void *arg)       // The consumers' thread function.
{
    struct Arg *argPtr = (struct Arg *)arg;
    int id = argPtr->id;
    Buffer *bufPtr = argPtr->bufPtr;

    int count = 0;
    int data = 0;
    while (bufGet( bufPtr, &data, 2 ))
    {
        ++count;
        printf("Consumer %d consumed %d\n", id, data);
    }
    return count;
}

Thread-Local Objects and Thread-Specific Storage

Thread-local objects and thread-specific storage are two techniques by which each thread can maintain separate data while using global identifiers for its variables. They allow functions that are executed in a given thread to share data without incurring conflicts, even when other threads are executing the same functions.

Using Thread-Local Objects

A global or static object whose declaration contains the new storage class specifier _Thread_local is a thread-local object. That means that each thread possesses its own instance of the object, which is created and initialized when the thread starts. The object’s storage duration lasts as long as the thread runs. In expressions, the object’s name always refers to the local instance of the object that belongs to the thread evaluating the expression.

The specifier _Thread_local can be used together with one of the specifiers static or extern. The header threads.h defines thread_local as a synonym for _Thread_local. In Example 14-6, the main thread and the newly started thread each have an instance of the thread-local variable var.

Example 14-6. Using a thread-local object
#include <stdio.h>
#include <threads.h>

thread_local int var = 10;

void print_var(void){ printf("var = %d\n", var); }
int func(void *);          // Thread function

int main(int argc, char *argv[])
{
   thrd_t th1;
   if ( thrd_create( &th1, func, NULL ) != thrd_success ){
     fprintf(stderr,"Error creating thread.\n"); return 0xff;
   }
   print_var();            // Output: var = 10
   thrd_join(th1, NULL);
   return 0;
}

int func(void *arg)        // Thread function
{
    var += 10;             // Thread-local variable
    print_var();           // Output: var = 20
    return 0;
}

Using Thread-Specific Storage

The technique of thread-specific storage is much more flexible than thread-local objects. The individual threads can use different amounts of storage, for example. They can dynamically allocate memory, and free it again by calling a destructor function. At the same time, the individual threads’ distinct memory blocks can be accessed using the same identifiers.

This flexibility is achieved by initially creating a global key that represents a pointer to thread-specific storage. The individual threads can then load this pointer with the location of their thread-specific storage. The key is an object of the type tss_t. The header threads.h contains this type definition and the declarations of four functions for managing thread-specific storage (abbreviated TSS):

int tss_create(tss_t *key, tss_dtor_t dtor);

Generates a new TSS pointer with the destructor dtor and sets the object pointed to by key to a value that uniquely identifies the pointer. The type tss_dtor_t is a function pointer, defined as void (*)(void*) (that is, a pointer to a function that takes one void pointer argument and has no return value). The value of dtor may be a null pointer.

void tss_delete(tss_t key);

Frees all the resources used by the TSS key key.

int tss_set(tss_t key, void *val);

Sets the TSS pointer identified by key, for the thread that calls tss_set(), to the memory block addressed by val.

void *tss_get(tss_t key);

Returns a pointer to the memory block that the calling thread has set by calling tss_set(). If an error occurs, tss_get() returns NULL.

The functions tss_create() and tss_set() return thrd_error if they incur an error; otherwise, thrd_success.

The program in Example 14-7 stores the name of a thread in dynamically allocated thread-specific memory.

Example 14-7. Using thread-specific storage
#include <threads.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

tss_t key;      // Global key for a TSS pointer

int thFunc(void *arg);        // Thread function
void destructor(void *data);  // Destructor function

int main(void)
{
   thrd_t th1, th2;
   int result1 = 0, result2 = 0;

   // Create the TSS key:
   if (tss_create(&key, destructor) != thrd_success)
     return -1;

   // Create threads:
   if (thrd_create(&th1, thFunc, "Thread_1") != thrd_success
        || thrd_create(&th2, thFunc, "Thread_2") != thrd_success)
       return -2;

   thrd_join(th1, &result1);   thrd_join(th2, &result2);
   if ( result1 != 0 || result2 != 0 )
      fputs("Thread error\n", stderr);
   else
      puts("Threads finished without error.");

   tss_delete(key);  // Free all resources of the TSS pointer.
   return 0;
}

void print(void)     // Display thread-specific storage.
{
  printf( "print: %s\n", (char*)tss_get(key) );
}

int thFunc( void *arg )
{
   char *name = (char*)arg;
   size_t size = strlen(name)+1;

   // Set thread-specific storage:
   if ( tss_set(key, malloc(size)) != thrd_success )
      return -1;
   // Store data:
   strcpy((char*)tss_get(key), name);
   print();
   return 0;
}

void destructor(void *data)
{
  printf("Destructor for %s\n", (char*)data);
  free(data);           // Release memory.
}