3 #include
4 #include "errors.h"
5 #include "workq.h"
6
7 /*
8 * Initialize a work queue.
9 */
10 int workq_init (workq_t *wq, int threads, void (*engine)(void *arg))
11 {
12 int status;
13
14 status = pthread_attr_init (&wq->attr);
15 if (status != 0)
16 return status;
17 status = pthread_attr_setdetachstate (
18 &wq->attr, PTHREAD_CREATE_DETACHED);
19 if (status != 0) {
20 pthread_attr_destroy (&wq->attr);
21 return status;
22 }
23 status = pthread_mutex_init (&wq->mutex, NULL);
24 if (status != 0) {
25 pthread_attr_destroy (&wq->attr);
26 return status;
27 }
28 status = pthread_cond_init (&wq->cv, NULL);
29 if (status != 0) {
30 pthread_mutex_destroy (&wq->mutex);
31 pthread_attr_destroy (&wq->attr);
32 return status;
33 }
34 wq->quit = 0; /* not time to quit */
35 wq->first = wq->last = NULL; /* no queue entries */
36 wq->parallelism = threads; /* max servers */
37 wq->counter = 0; /* no server threads yet */
38 wq->idle = 0; /* no idle servers */
39 wq->engine = engine;
40 wq->valid = WORKQ_VALID;
41 return 0;
42 }
■ workq.c part 1 workq_init
Part 2 shows the workq_destroy
function. The procedure for shutting down a work queue is a little different than the others we've seen. Remember that the Pthreads mutex and condition variable destroy function fail, returning EBUSY, when you try to destroy an object that is in use. We used the same model for barriers and read/write locks. But we cannot do the same for work queues — the calling program cannot know whether the work queue is in use, because the caller only queues requests that are processed asynchronously.
The work queue manager will accept a request to shut down at any time, but it will wait for all existing engine threads to complete their work and terminate. Only when the last work queue element has been processed and the last engine thread has exited will workq_destroy
return successfully.
24 If the work queue has no threads, either it was never used or all threads have timed out and shut down since it was last used. That makes things easy, and we can skip all the shutdown complication.
25-33 If there are engine threads, they are asked to shut down by setting the quit flag in the workq_t
structure and broadcasting the condition variable to awaken any waiting (idle) engine threads. Each engine thread will eventually run and see this flag. When they see it and find no more work, they'll shut themselves down.
44-50 The last thread to shut down will wake up the thread that's waiting in workq_destroy
, and the shutdown will complete. Instead of creating a condition variable that's used only to wake up workq_destroy, the last thread will signal the same condition variable used to inform idle engine threads of new work. At this point, all waiters have already been awakened by a broadcast, and they won't wait again because the quit flag is set. Shutdown occurs only once during the life of the work queue manager, so there's little point to creating a separate condition variable for this purpose.
■ workq.c part 2 workq_destroy
1 /*
2 * Destroy a work queue.
3 */
4 int workq_destroy (workq_t *wq)
5 {
6 int status, status1, status2;
7
8 if (wq->valid != WORKQ_VALID)
9 return EINVAL;
10 status = pthread_mutex_lock (&wq->mutex);
11 if (status != 0)
12 return status;
13 wq->valid = 0; /* prevent any other operations */
14
15 /*
16 * Check whether any threads are active, and run them down:
17 *
18 * 1. set the quit flag
19 * 2. broadcast to wake any servers that may be asleep