10 stage_t *next_stage = stage->next;
11 int status;
12
13 status = pthread_mutex_lock (&stage->mutex);
14 if (status != 0)
15 err_abort (status, "Lock pipe stage");
16 while (1) {
17 while (stage->data_ready != 1) {
18 status = pthread_cond_wait (&stage->avail, &stage->mutex);
19 if (status != 0)
20 err_abort (status, "Wait for previous stage");
21 }
22 pipe_send (next_stage, stage->data + 1);
23 stage->data_ready = 0;
24 status = pthread_cond_signal (&stage->ready);
25 if (status != 0)
26 err_abort (status, "Wake next stage");
27 }
28 /*
29 * Notice that the routine never unlocks the stage->mutex.
30 * The call to pthread_cond_wait implicitly unlocks the
31 * mutex while the thread is waiting, allowing other threads
32 * to make progress. Because the loop never terminates, this
33 * function has no need to unlock the mutex explicitly.
34 */
Part 4 shows pipe_create, the function that creates a pipeline. It can create a pipeline of any number of stages, linking them together in a list.
18-34 For each stage, it allocates a new stage_t structure and initializes the members. Notice that one additional "stage" is allocated and initialized to hold the final result of the pipeline.
36-37 The link member of the final stage is set to NULL to terminate the list, and the pipeline's tail is set to point at the final stage. The tail pointer allows pipe_ result to easily find the final product of the pipeline, which is stored into the final stage.
52-59 After all the stage data is initialized, pipe_create creates a thread for each stage. The extra "final stage" does not get a thread—the termination condition of the for loop is that the current stage's next link is not NULL, which means that it will not process the final stage.
■ pipe.c part 4 pipe_create
1 /*
2 * External interface to create a pipeline. All the
3 * data is initialized and the threads created. They'll
4 * wait for data.
5 */
6 int pipe_create (pipe_t *pipe, int stages)
7 {
8 int pipe_index;
9 stage_t **link = &pipe->head, *new_stage, *stage; 10 int status;
11
12 status = pthread_mutex_init(&pipe->mutex, NULL);
13 if (status != 0)
14 err_abort (status, "Init pipe mutex");
15 pipe->stages = stages;
16 pipe->active = 0;
17
18 for (pipe_index = 0; pipe_index <= stages; pipe_index++) {
19 new_stage = (stage_t*)malloc (sizeof (stage_t));
20 if (new_stage == NULL)
21 errno_abort ("Allocate stage");
22 status = pthread_mutex_init (&new_stage->mutex, NULL);
23 if (status != 0)
24 err_abort (status, "Init stage mutex");
25 status = pthread_cond_init (&new_stage->avail, NULL);
26 if (status != 0)
27 err_abort (status, "Init avail condition");
28 status = pthread_cond_init (&new_stage->ready, NULL);
29 if (status != 0)
30 err_abort (status, "Init ready condition");
31 new_stage->data_ready = 0;
32 *link = new_stage;
33 link = &new_stage->next;
34 }
35
36 *link = (stage_t*)NULL; /* Terminate list */