20 * 4. wait for all threads to quit (counter goes to 0)
21 * Because we don't use join, we don't need to worry
22 * about tracking thread IDs.
23 */
24 if (wq->counter > 0) {
25 wq->quit = 1;
26 /* if any threads are idling, wake them. */
27 if (wq->idle > 0) {
28 status = pthread_cond_broadcast (&wq->cv);
29 if (status != 0) {
30 pthread_mutex_unlock (&wq->mutex);
31 return status;
32 }
33 }
34
35 /*
36 * Just to prove that every rule has an exception, I'm
37 * using the "cv" condition for two separate predicates
38 * here. That's OK, since the case used here applies
39 * only once during the life of a work queue — during
40 * rundown. The overhead is minimal and it's not worth
41 * creating a separate condition variable that would
42 * wait and be signaled exactly once!
43 */
44 while (wq->counter > 0) {
45 status = pthread_cond_wait (&wq->cv, &wq->mutex);
46 if (status != 0) {
47 pthread_mutex_unlock (&wq->mutex);
48 r eturn status;
49 }
50 }
51 }
52 status = pthread_mutex_unlock (&wq->mutex);
53 if (status != 0)
54 return status;
55 status = pthread_mutex_destroy (&wq->mutex);
56 status1 = pthread_cond_destroy (&wq->cv);
57 status2 = pthread_attr_destroy (&wq->attr);
58 return (status ? status : (status1 ? status1 : status2));
59 }
Part 3 shows workq_add
, which accepts work for the queue manager system.
16-35 It allocates a new work queue element and initializes it from the parameters. It
queues the element, updating the first and last pointers as necessary.
40-45 If there are idle engine threads, which were created but ran out of work, signal
the condition variable to wake one.
46-59 If there are no idle engine threads, and the value of parallelism allows for
more, create a new engine thread. If there are no idle threads and it can't create a
new engine thread, workq_add
returns, leaving the new element for the next
thread that finishes its current assignment.
■ workq.c part 3 workq_add
1 /*
2 * Add an item to a work queue.
3 */
4 int workq_add (workq_t *wq, void *element)
5 {
6 workq_ele_t *item;
7 pthread_t id;
8 int status;
9
10 if (wq->valid != WORKQ_VALID)
11 return EINVAL;
12
13 /*
14 * Create and initialize a request structure.
15 */
16 item = (workq_ele_t *)malloc (sizeof (workq_ele_t));
17 if (item == NULL)
18 return ENOMEM;
19 item->data = element;
20 item->next = NULL;
21 status = pthread_mutex_lock (&wq->mutex);
22 if (status != 0) {
23 free (item);
24 return status;
25 }
26
27 /*
28 * Add the request to the end of the queue, updating the
29 * first and last pointers.
30 */
31 if (wq->first == NULL)
32 wq->first = item;
33 else
34 wq->last->next = item;
35 wq->last = item;
36
37 /*
38 * if any threads are idling, wake one.
39 */
40 if (wq->idle > 0) {
41 status = pthread_cond_signal (&wq->cv);
42 if (status != 0) {
43 pthread_mutex_unlock (&wq->mutex);
44 return status;
45 }