Хотя такой вариант может применяться для некоторых драйверов (как правило, для псевдоустройств, например, /dev/null), в общем случае устройство не может быть все время готово к обработке данных, а потеря данных из-за занятости устройства недопустима. Таким образом, в потоке может происходить блокирование передачи данных[60], и эта ситуация не должна приводить к потере сообщений, во избежание которой необходим согласованный между модулями механизм управления потоком. Для этого сообщения обрабатываются и буферизуются в соответствующей очереди модуля, а их передача возлагается на функцию
, вызываемую ядром автоматически. Для каждой очереди определены две ватерлинии — верхняя и нижняя, которые используются для контроля заполненности очереди. Если число сообщений превышает верхнюю ватерлинию, очередь считается переполненной, и передача сообщений блокируется, пока их число не станет меньше нижней ватерлинии.
Рассмотрим пример потока, модули 1 и 3 которого поддерживают управление потоком данных, а модуль 2 — нет. Другими словами, модуль 2 не имеет процедуры
. Когда сообщение достигает модуля 3, вызывается его функция
. После необходимой обработки сообщения, оно помещается в очередь модуля 3 с помощью функции
mod1put(queue_t* q, mblk_t* mp) {
/* Необходимая обработка сообщения */
...
putq(q, mp);
}
Через некоторое время ядро автоматически запускает процедуру
модуля 3. Для каждого сообщения очереди xxput()
вызывает функцию
#include
int canput(queue_t* q);
Заметим, что
. В противном случае, как уже говорилось, очередь модуля не принимает участия в передаче данных. В нашем примере,
либо передаст сообщение следующему модулю (в нашем примере — модулю 2, который после необходимой обработки сразу же передаст его модулю 1), либо вернет сообщение обратно в очередь, если следующая очередь переполнена.
Описанная схема показана на рис. 5.20. Ниже приведен скелет процедуры
модуля 3, иллюстрирующий описанный алгоритм передачи сообщений с использованием механизма управления передачей данных.
Рис. 5.20. Управление потоком данных
mod1service(queue_t *q) {
mblk_t* mp;
while ((mp = getq(q)) != NULL) {
if (canput(q->q_next))
putnext(q, mp);
else {
putbq(q, mp);
break;
}
}
В этом примере функция
завершает свою работу, и сообщения начинают буферизоваться в очереди модуля 3. При этом очередь временно исключается из списка очередей, ожидающих обработки, и процедура
для нее вызываться не будет. Данная ситуация продлится до тех пор, пока число сообщений очереди записи модуля 1 не станет меньше нижней ватерлинии.
Пока существует возникшая блокировка передачи, затор будет постепенно распространяться вверх по потоку, последовательно заполняя очереди модулей, пока, в конечном итоге, не достигнет головного модуля. Поскольку передачу данных в головной модуль (вниз по потоку) инициирует приложение, попытка передать данные в переполненный головной модуль вызовет блокирование процесса[61] и переход его в состояние сна.
В конечном итоге, модуль 1 обработает сообщения своей очереди, и их число станет меньше нижней ватерлинии. Как только очередь модуля 1 станет готовой к приему новых сообщений, планировщик STREAMS автоматически вызовет процедуры
для модулей, ожидавших освобождения очереди модуля в нашем примере — для модуля 3.
Управление передачей данных в потоке требует согласованной работы всех модулей. Например, если процедура
буферизует сообщения для последующей обработки
, такой алгоритм должен выполняться для всех сообщений.[62] В противном случае, это может привести к нарушению порядка сообщений, и как следствие, к потере данных.
Вильям Л Саймон , Вильям Саймон , Наталья Владимировна Макеева , Нора Робертс , Юрий Викторович Щербатых
Зарубежная компьютерная, околокомпьютерная литература / ОС и Сети, интернет / Короткие любовные романы / Психология / Прочая справочная литература / Образование и наука / Книги по IT / Словари и Энциклопедии