ПРИМЕЧАНИЕ
Наша схема может оказаться медленной в случае наличия в очереди большого количества сообщений, поскольку каждый раз при добавлении нового придется просматривать их значительную часть. Можно хранить отдельно индексы последних сообщений со всеми имеющимися значениями приоритета.
75-77 Если очередь была пуста в момент помещения в нее нового сообщения, мы вызываем pthread_cond_signal, чтобы разблокировать любой из процессов, ожидающих сообщения.
78 Увеличиваем на единицу количество сообщений в очереди mq_curmsgs.
Функция mq_receive
В листинге 5.27 приведен текст первой половины функции mq_receive, которая получает необходимые указатели, блокирует взаимное исключение и проверяет объем буфера вызвавшего процесса, который должен быть достаточным для помещения туда сообщения максимально возможной длины.
30-40 Если очередь пуста и установлен флаг O_NONBLOCK, возвращается ошибка с кодом EAGAIN. В противном случае увеличивается значение счетчика mqh_nwait, который проверяется функцией mq_send (листинг 5.25) в случае, если очередь пуста и есть процессы, ожидающие уведомления. Затем мы ожидаем сигнала по условной переменной, который будет передан функцией mq_send (листинг 5.26).
ПРИМЕЧАНИЕ
Наша реализация mq_receive, как и реализация mq_send, упрощает ситуацию с ошибкой EINTR, возвращаемой при прерывании ожидания сигналом, перехватываемым вызвавшим процессом.
В листинге 5.28 приведен текст второй половины функции mq_receive. Мы уже знаем, что в очереди есть сообщение, которое можно будет возвратить вызвавшему процессу.
//my_pxmsg_mmap/mq_receive.с
1 #include "unpipc.h"
2 #include "mqueue.h"
3 ssize_t
4 mymq_receive(mymqd_t mqd, char *ptr, size_t maxlen, unsigned int *priop)
5 {
6 int n;
7 long index;
8 int8_t *mptr;
9 ssize_t len;
10 struct mymq_hdr *mqhdr;
11 struct mymq_attr *attr;
12 struct mymsg_hdr *msghdr;
13 struct mymq_info *mqinfo;
14 mqinfo = mqd;
15 if (mqinfo-mqi_magic != MQI_MAGIC) {
16 errno = EBADF;
17 return(-1);
18 }
19 mqhdr = mqinfo-mqi_hdr; /* указатель struct */
20 mptr = (int8_t *) mqhdr; /* указатель на байт */
21 attr = mqhdr-mqh_attr;
22 if ((n = pthread_mutex_lock(mqhdr-mqh_lock)) != 0) {
23 errno = n;
24 return(-1);
25 }
26 if (maxlen attr-mq_msgsize) {
27 errno = EMSGSIZE;
28 goto err;
29 }
30 if (attr-mq_curmsgs = 0) { /* очередь пуста */
31 if (mqinfo-mqi_flags O_NONBLOCK) {
32 errno = EAGAIN;
33 goto err;
34 }
35 /* ожидаем помещения сообщения в очередь */
36 mqhdr-mqh_nwait++;
37 while (attr-mq_curmsgs == 0)
38 pthread_cond_wait(mqhdr-mqh_wait, mqhdr-mqh_lock);
39 mqhdr-mqh_nwait--;
40 }
//my_pxmsg_mmap/mq_receive.c
41
if ((index = mqhdr-mqh_head) == 0)
42 err_dump("mymq_receive: curmsgs = %ld; head = 0", attr-mq_curmsgs);
43 msghdr = (struct mymsg_hdr *) mptr[index];
44 mqhdr-mqh_head = msghdr-msg_next; /* новое начало списка */
45 len = msghdr-msg_len;
46 memcpy(ptr, msghdr + 1, len); /* копирование самого сообщения */