33-36 Когда флаг mqflag принимает ненулевое значение, мы регистрируемся на получение уведомления заново и считываем сообщение из очереди. Затем мы разблокируем сигнал SIGUSR1 и возвращаемся к началу цикла.
Мы уже говорили, что в этой версии программы также присутствует ошибка. Посмотрим, что произойдет, если в очередь попадут два сообщения, прежде чем будет считано первое из них. Мы можем имитировать это, добавив sleep перед вызовом mq_notify. Проблема тут в том, что уведомление отсылается только в том случае, когда сообщение помещается в пустую очередь. Если в очередь поступают два сообщения, прежде чем первое будет считано, то отсылается только одно уведомление. Тогда мы считываем первое сообщение и вызываем sigsuspend, ожидая поступления еще одного. А в это время в очереди уже имеется сообщение, которое мы должны прочитать, но которое мы никогда не прочтем.
Пример: уведомление сигналом с отключением блокировки
Исправить описанную выше ошибку можно, отключив блокировку операции считывания сообщений. Листинг 5.10 содержит измененную версию программы из листинга 5.9. Новая программа считывает сообщения в неблокируемом режиме.
//pxmsg/mqnotifysig3.с
1 #include "unpipc.h"
2 volatile sig_atomic_t mqflag; /* ненулевое значение устанавливается обработчиком сигнала */
3 static void sig_usr1(int);
4 int
5 main(int argc, char **argv)
6 {
7 mqd_t mqd;
8 void *buff;
9 ssize_t n;
10 sigset_t zeromask, newmask, oldmask;
11 struct mq_attr attr;
12 struct sigevent sigev;
13 if (argc != 2)
14 err_quit("usage: mqnotifysig3
15 /* открытие очереди, получение атрибутов, выделение буфера */
16 mqd = Mq_open(argv[1], O_RDONLY | O_NONBLOCK);
17 Mq_getattr(mqd, &attr);
18 buff = Malloc(attr.mq_msgsize);
19 Sigemptyset(&zeromask); /* сигналы не блокируются */
20 Sigemptyset(&newmask);
21 Sigemptyset(&oldmask);
22 Sigaddset(&newmask, SIGUSR1);
23 /* установка обработчика, включение уведомления */
24 Signal(SIGUSR1, sig_usr1);
25 sigev.sigev_notify = SIGEV_SIGNAL;
26 sigev.sigev_signo = SIGUSR1;
27 Mq_notify(mqd, &sigev);
28 for (;;) {
29 Sigprocmask(SIG_BLOCK, &newmask, &oldmask); /* блокируем SIGUSR1 */
30 while (mqflag == 0)
31 sigsuspend(&zeromask);
32 mqflag = 0; /* сброс флага */
33 Mq_notify(mqd, &sigev); /* перерегистрируемся */
34 while ((n = mq_receive(mqd, buff, attr.mq_msgsize, NULL)) >= 0) {
35 printf("read $ld bytes\n", (long) n);
36 }
37 if (errno != EAGAIN)
38 err_sys("mq_receive error");
39 Sigprocmask(SIG_UNBLOCK, &newmask, NULL); /* разблокируем SIGUSR1 */
40 }
41 exit(0);
42 }
43 static void
44 sig_usr1(int signo)
45 {
46 mqflag = 1;
47 return;
48 }
15-18 Первое изменение в программе: при открытии очереди сообщений указывается флаг O_NONBLOCK.
34-38 Другое изменение: mq_receive вызывается в цикле, считывая все сообщения в очереди, пока не будет возвращена ошибка с кодом EAGAIN, означающая отсутствие сообщений в очереди.
Пример: уведомление с использованием sigwait вместо обработчика