Рассматриваемые примитивы служат принципиально различным целям. Мьютекс, как уже было сказано ранее, предназначен в первую очередь для регламентации доступа к участкам программного кода. Семафоры же больше предназначены для регламентации порядка доступа к определенным объектам данных. Классическими задачами этого класса являются задачи «производитель-потребитель», когда M производителей создают некоторые объекты данных (читая эти данные с реальных внешних устройств, или создавая их как результат только внутренних вычислений, или любым другим способом), а N потребителей независимо берут произведенные объекты данных на последующую обработку.
Это настолько общий и часто встречающийся класс задач, что покажем для него простейший «скелет» в виде отдельного приложения, в котором отслеживание порядка доступа потребителей будет осуществлять счетный семафор (
delay
) на случайную величину (производство и обработка объектов данных в коде не показаны, так как это не относится к существу рассматриваемого — нас интересуют процессы синхронизации этих операций, а не сами операции).
Кроме основной нашей цели это приложение дополнительно демонстрирует:
• Практическое использование принудительного завершения (отмены) потоков «извне» с управлением состоянием завершаемости потоков и расстановкой точек отмены, о чем мы уже говорили ранее.
• Использование атомарных (непрерываемых) операций (например,
atomic_add_value
), о которых мы будем говорить чуть позже.
• Использование реентерабельных форм функций стандартной библиотеки, безопасных в многопоточной среде (
rand_r
вместо
rand
).
#include
#include
#include
#include
#include
#include
#include
#include
const int D = 10;
unsigned int T = 2;
static sem_t sem;
pthread_t* tid;
void* writer(void* data) {
unsigned long i = (int)(data); // общий размер выборки
unsigned int s = 1;
while (i-- > 0) {
delay((long)rand_r(&s) * D / RAND_MAX + 1);
sem_post(&sem); // объект данных произведен
}
for (i = 0; i < T; i++) pthread_cancel(tid[i + 1]);
return NULL;
}
static char *str; // строка результирующей диагностики
static volatile unsigned ind = 0;
void* reader(void*) {
char tid[8];
sprintf(tid, "%X", pthread_self);
unsigned int s = rand;
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
while(true) {
sem_wait(&sem); // получен объект данных
str[atomic_add_value(&ind, 1)] = *tid;
pthread_testcancel;
delay((long)rand_r(&s) * D * T / RAND_MAX + 1);
}
return NULL;
}
int main(int argc, char *argv[]) {
unsigned long N = 1000;
int opt, val;
while ((opt = getopt(argc, argv, "n:t:")) != -1) {
switch(opt) {
case 'n':
if (sscanf(optarg, "%i", &val) != 1)
cout << "parse command line error" << endl, exit(EXIT_FAILURE);
if (val > 0) N = val;
break;
case 't':
if (sscanf(optarg, "%i", &val) != 1)
cout << "parse command line error" << endl, exit(EXIT_FAILURE);
if (val > 0) T = val;
break;
default:
exit(EXIT_FAILURE);
}
}
str = new char[N + 1];
tid = new pthread_t[T + 1];
if (sem_init(&sem, 0, 0))
perror("semaphore init"), exit(EXIT_FAILURE);
if (pthread_create(tid, NULL, writer, (void*)N) >= EOK)
perror("writer create error"), exit(EXIT_FAILURE);
for (int i = 0; i < T; i++)
if (pthread_create(tid + i + 1, NULL, reader, NULL) != EOK)