#include
#include "tlpi_hdr.h"
#define NOTIFY_SIG SIGUSR1
static void
handler(int sig)
{
/* Просто прерываем вызов sigsuspend() */
}
int
main(int argc, char *argv[])
{
struct sigevent sev;
mqd_t mqd;
struct mq_attr attr;
void *buffer;
ssize_t numRead;
sigset_t blockMask, emptyMask;
struct sigaction sa;
if (argc!= 2 || strcmp(argv[1], "-help") == 0)
usageErr("%s mq-name\n", argv[0]);
if (mqd == (mqd_t) -1)
errExit("mq_open");
errExit("mq_getattr");
if (buffer == NULL)
errExit("malloc");
sigaddset(&blockMask, NOTIFY_SIG);
if (sigprocmask(SIG_BLOCK, &blockMask, NULL) == -1)
errExit("sigprocmask");
sigemptyset(&sa.sa_mask);
sa.sa_flags = 0;
sa.sa_handler = handler;
if (sigaction(NOTIFY_SIG, &sa, NULL) == -1)
errExit("sigaction");
sev.sigev_signo = NOTIFY_SIG;
if (mq_notify(mqd, &sev) == -1)
errExit("mq_notify");
sigemptyset(&emptyMask);
for (;;) {
errExit("mq_notify");
printf("Read %ld bytes\n", (long) numRead);
if (errno!= EAGAIN) /* Непредвиденная ошибка */
errExit("mq_receive");
}
}
pmsg/mq_notify_sig.c
Ряд аспектов программы из листинга 48.6 заслуживает дополнительного внимания.
• Вместо использования вызова pause() мы блокируем сигнал и ждем его с помощью вызова sigsuspend(); так можно исключить возможность потери сигнала, который доставляется, пока программа занимается другими делами (то есть когда она не заблокирована в ожидании сигнала) в цикле for. Если бы мы задействовали для этого функцию pause(), то следующий ее вызов был бы заблокирован, несмотря на уже доставленный сигнал.
• Очередь открывается в неблокирующем режиме; при поступлении оповещения мы применяем цикл while для прочтения сообщения из очереди. Опустошение очереди таким способом гарантирует, что процесс будет оповещен при появлении нового сообщения. Использование неблокирующего режима означает, что при опустошении очереди цикл while завершится неудачей (вызов mq_receive() вернет ошибку EAGAIN). Такой подход аналогичен применению неблокирующего ввода/вывода в сочетании с оповещениями, срабатывающими при готовности (о чем мы поговорим в подразделе 59.1.1), и применяется по схожим причинам.
• Важно, чтобы в цикле for подписка на оповещение произошла
48.6.2. Получение уведомлений в отдельном потоке
В листинге 48.7 показан пример оповещения с помощью потоков. В этой программе использован целый ряд архитектурных решений, которые применяются в листинге 48.6:
• при появлении оповещения программа, прежде чем опустошить очередь, возобновляет свою подписку
• применяется неблокирующий режим; это позволяет полностью опустошить очередь после получения оповещения, не блокируя программу
Листинг 48.7. Получение оповещений с помощью потока
pmsg/mq_notify_thread.c
#include
#include
#include
#include "tlpi_hdr.h"
static void notifySetup(mqd_t *mqdp);
static void /* Функция оповещения потока */
{
ssize_t numRead;
mqd_t *mqdp;
void *buffer;
struct mq_attr attr;
mqdp = sv.sival_ptr;
if (mq_getattr(*mqdp, &attr) == -1)
errExit("mq_getattr");
buffer = malloc(attr.mq_msgsize);
if (buffer == NULL)
errExit("malloc");
while ((numRead = mq_receive(*mqdp, buffer, attr.mq_msgsize,
NULL)) >= 0)
printf("Read %ld bytes\n", (long) numRead);
if (errno!= EAGAIN) /* Непредвиденная ошибка */
errExit("mq_receive");
free(buffer);
pthread_exit(NULL);
}
static void
notifySetup(mqd_t *mqdp)
{
struct sigevent sev;
sev.sigev_notify_function = threadFunc;
sev.sigev_notify_attributes = NULL;
/* Может быть указателем на структуру pthread_attr_t */
if (mq_notify(*mqdp, &sev) == -1)
errExit("mq_notify");
}
int
main(int argc, char *argv[])
{
mqd_t mqd;
if (argc!= 2 || strcmp(argv[1], "-help") == 0)
usageErr("%s mq-name\n", argv[0]);
if (mqd == (mqd_t) -1)
errExit("mq_open");
pause(); /* Ждем оповещений через функцию потока */
}