105 goto err;
106 close(fd);
107 return((mymqd_t) mqinfo);
108 }
51-58 Вычисляется размер сообщения, который затем округляется до кратного размеру длинного целого. Также в файле отводится место для структуры mq_hdr в начале файла и msghdr в начале каждого сообщения (рис. 5.2). Размер вновь созданного файла устанавливается функцией lseek и записью одного байта со значением 0. Проще было бы вызвать ftruncate (раздел 13.3), но у нас нет гарантий, что это сработало бы для увеличения размера файла.
59-63 Файл отображается в память функцией mmap.
64-66 При каждом вызове mq_open создается отдельный экземпляр mq_infо. Эта структура после создания инициализируется.
67-87 Инициализируется структура mq_hdr. Заголовок связного списка сообщений (mqh_head) инициализируется нулем, а все сообщения в очереди добавляются к списку свободных (mqh_frее).
88-102 Поскольку очереди сообщений Posix могут использоваться совместно произвольным количеством процессов, которые знают имя очереди и имеют соответствующие разрешения, нам нужно инициализировать взаимное исключение и условную переменную с атрибутом PTHREAD_PROCESS_SHARED. Для этого мы сначала инициализируем атрибуты вызовом pthread_mutexattr_init, а затем устанавливаем значение атрибута совместного использования процессами, вызвав pthread_mutexattr_setpshared. После этого взаимное исключение инициализируется вызовом pthread_mutex_init. Аналогичные действия выполняются для условной переменной. Мы должны аккуратно удалить взаимное исключение и условную переменную даже при возникновении ошибки, поскольку вызовы pthread_ mutexattr_init и pthread_condattr_init выделяют под них память (упражнение 7.3).
103-107 После инициализации очереди сообщений мы сбрасываем бит user-execute. Это говорит другим процессам о том, что очередь была проинициализирована. Мы также закрываем файл вызовом close, поскольку он был успешно отображен в память и держать его открытым больше нет необходимости.
В листинге 5.19 приведен конец функции mq_open, в котором осуществляется открытие существующей очереди сообщений.
//my_pxmsg_mmap/mq_open.с
109 exists:
110 /* открытие файла и отображение его в память */
111 if ((fd = open(pathname, O_RDWR)) < 0) {
112 if (errno == ENOENT && (oflag & O_CREAT))
113 goto again;
114 goto err;
115 }
116 /* проверяем, что инициализация завершена */
117 for (i = 0; i < MAX TRIES; i++) {
118 if (stat(pathname, &statbuff) == –1) {
119 if (errno == ENOENT && (oflag & O_CREAT)) {
120 close(fd);
121 goto again;
122 }
123 goto err;
124 }
125 if ((statbuff.st_mode & S_IXUSR) == 0)
126 break;
127 sleep(1);
128 }
129 if (i == MAX_TRIES) {
130 errno = ETIMEDOUT;
131 goto err;
132 }
133 filesize = statbuff.st_size;
134 mptr = mmap(NULL, filesize, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
135 if (mptr == MAP_FAILED)
136 goto err;
137 close(fd);
138 /* выделяем одну mymq_info{} для каждого вызова open */
139 if ((mqinfo = malloc(sizeof(struct mymq_info))) == NULL)
140 goto err;
141 rnqinfo->mqi_hdr = (struct mymq_hdr *) mptr;
142 mqinfo->mqi_magic = MQI_MAGIC;
143 mqinfo->mqi_flags = nonblock;
144 return((mymqd_t) mqinfo);
145 pthreaderr:
146 errno = i;
147 err:
148 /* не даем следующим вызовам изменить errno */
149 save_errno = errno;
150 if (created)
151 unlink(pathname);
152 if (mptr != MAP_FAILED)
153 munmap(mptr, filesize);
154 if (mqinfo != NULL)