23 typedef struct pipe_tag {
24 pthread_mutex_t mutex; /* Mutex to protect pipe */
25 stage_t *head; /* First stage */
26 stage_t *tail; /* Final stage */
27 int stages; /* Number of stages */
28 int active; /* Active data elements */
29 } pipe_t;
9-17 Each stage of a pipeline is represented by a variable of type stage_t. stage_t contains a mutex to synchronize access to the stage. The avail condition variable is used to signal a stage that data is ready for it to process, and each stage signals its own ready condition variable when it is ready for new data. The data member is the data passed from the previous stage, thread is the thread operating this stage, and next is a pointer to the following stage.
23-29 The pipe_t structure describes a pipeline. It provides pointers to the first and last stage of a pipeline. The first stage, head, represents the first thread in the pipeline. The last stage, tail, is a special stage_t that has no thread—it is a place to store the final result of the pipeline.
Part 2 shows pipe_send, a utility function used to start data along a pipeline, and also called by each stage to pass data to the next stage.
17-23 It begins by waiting on the specified pipeline stage's ready condition variable until it can accept new data.
28-30 Store the new data value, and then tell the stage that data is available.
■ pipe.c part 2 pipe_send
1 /*
2 * Internal function to send a "message" to the
3 * specified pipe stage. Threads use this to pass
4 * along the modified data item.
5 */
6 int pipe_send (stage_t *stage, long data)
7 {
8 int status; 9
10 status = pthread_mutex_lock (&stage->mutex);
11 if (status != 0)
12 return status;
13 /*
14 * If there's data in the pipe stage, wait for it
15 * to be consumed.
16 */
17 while (stage->data_ready) {
18 status = pthread_cond_wait (&stage->ready, &stage->mutex);
19 if (status != 0) {
20 pthread_mutex_unlock (&stage->mutex);
21 return status;
22 }
23 }
24
25 /*
26 * Send the new data
27 */
28 stage->data = data;
29 stage->data_ready = 1;
30 status = pthread_cond_signal (&stage->avail);
31 if (status != 0) {
32 pthread_mutex_unlock (&stage->mutex);
33 return status;
34 }
35 status = pthread_mutex_unlock (&stage->mutex);
36 return status;
37 }
Part 3 shows pipe_stage, the start function for each thread in the pipeline. The thread's argument is a pointer to its stage_t structure.
16-27 The thread loops forever, processing data. Because the mutex is locked outside the loop, the thread appears to have the pipeline stage's mutex locked all the
time. However, it spends most of its time waiting for new data, on the avail condition variable. Remember that a thread automatically unlocks the mutex associated with a condition variable, while waiting on that condition variable. In reality, therefore, the thread spends most of its time with mutex unlocked.
22-26 When given data, the thread increases its own data value by one, and passes the result to the next stage. The thread then records that the stage no longer has data by clearing the data_ready flag, and signals the ready condition variable to wake any thread that might be waiting for this pipeline stage.
■ pipe.c part 3 pipe_stage
1 /*
2 * The thread start routine for pipe stage threads.
3 * Each will wait for a data item passed from the
4 * caller or the previous stage, modify the data
5 * and pass it along to the next (or final) stage.
6 */
7 void *pipe_stage (void *arg)
8 {
9 stage_t *stage = (stage_t*)arg;