geektimes

Posix threads

  • суббота, 24 января 2015 г. в 02:14:09
http://habrahabr.ru/post/248651/

image

Рано или поздно любой программист в своей жизни встречается с многопоточностью.

Многопоточность – это свойство алгоритма распаралеливатся, т.е. выполнять операции некими вычислителями независимо друг от друга.

Самый простой пример — это программирование GUI. GUI всегда должен быть отрисован, вне зависимости от того, что делается в программе. Например, loading screen – он всего лишь показывает прогресс, по мере загрузки ресурсов. И выглядит это примерно так:



Для решения подобных задач используются потоки (threads). Почти каждый язык программирования содержит свой wrapper над потоками ОС, или реализует свои потоки (привет, велосипед). На Хабре есть довольно много статей про потоки, например, «Что такое нити (threads)?» или «Многопоточность в Java». В общем, поиск в помощь. Но эта статья будет отличатся тем, что здесь будут рассмотрены нативные потоки ОС – POSIX threads.

POSIX (англ. portable operating system interface for Unix — переносимый интерфейс операционных систем Unix) — набор стандартов, описывающих интерфейсы между операционной системой и прикладной программой (системный API), библиотеку языка C и набор приложений и их интерфейсов. Стандарт создан для обеспечения совместимости различных UNIX-подобных операционных систем и переносимости прикладных программ на уровне исходного кода, но может быть использован и для не-Unix систем. Серия стандартов POSIX была разработана комитетом 1003 IEEE. Международная организация по стандартизации (ISO) совместно c Международной электротехнической комиссией (IEC) приняли данный стандарт (POSIX) под названием ISO/IEC 9945.

Собственно, интересен будет только POSIX.1c, расширения потоков (IEEE Std 1003.1c-1995). Этот стандарт определяет API для создания и управления потоками и тп. Список основных функций API.

Рассмотрим пример приведенного выше loading-screen'a максимально упрощенной реализации на С.

#include <pthread.h>
#include <unistd.h>
#define nil NULL
size_t progress = 0;
void drawProgress(void) {
    while(progress < 100) {
        printf("Progress %lu \n", progress);
        sleep(1);
    }
}
void loadData(void) {
    while(progress < 100) {
        ++progress;
        sleep(1);
    }
}
int main(int argc, const char *argv[]) {
    pthread_t drawer, loader;
    pthread_create(&drawer, nil, (void *(*)(void *)) drawProgress, nil);
    pthread_create(&loader, nil, (void *(*)(void *)) loadData, nil);
    pthread_join(loader, nil);
    pthread_join(drawer, nil);
    return 0;
}

Как видим, сначала мы создаем рабочие структуры хранения информации о потоках типа pthread_t – так выглядят потоки для пользователя API Posix.

Далее мы иницилизируем их с помощью функции pthread_create, где первым аргументом идет указатель на дексриптор потока, вторым аргументом идет переменная параметром потока, которая выше nil, третьим аргуметом – функция, которую выполняет поток, и четвертым параметром – аргументы, которые передаются в это функцию, тоже nil (nil мне кажется более синтаксически приятным чем NULL, тем более привычка). Аргумент и возвращаемые параментры функции, которую выполняет поток, должны быть void* — т.е. сырой указатель на данные.

Сразу после того, как был вызван pthread_create, поток отделяется и начинает выполнять свою функцию. Т.е. все, что после pthread_create происходит в функции – происходит параллельно main потока (в том котором выполняется main функция).

pthread_join – функция, которая нужна для того, чтобы main поток не завершился сразу после создания тех двух, а ждал их завершения. Представим примерную диаграмму:



В примере выше мы сначала ждем завершение потока loader, а потом drawer. Но обычно не важен порядок ожидания потоков, если мы ничего после этого не выполняем.

Попробуем пробросить аргумент с несколькими аргументами. Будет аналогично, т.к. можно просто передать указатель на массив аргументов или на структуру с аргументами и т.д. Главное правильно преобразовать типы. И добавим отмену.

typedef struct {
    size_t progress;
    int errorCode;
} ThreadArg;
void drawProgress(void *arg) {
    ThreadArg *argument = arg;
    while(argument->progress < 100) {
        printf("Progress %lu \n", argument->progress);
        sleep(1);
        if(argument->progress == 7) {
            argument->errorCode = -1;
        }
    }
}
void loadData(void *arg) {
    ThreadArg *argument = arg;
    while(argument->progress < 100) {
        ++argument->progress;
        sleep(1);
        if(argument->errorCode == -1) {
            pthread_cancel(pthread_self());
        }
    }
}
int main(int argc, const char *argv[]) {
    pthread_t drawer, loader;
    ThreadArg arg;
    pthread_create(&drawer, nil, (void *(*)(void *)) drawProgress, &arg);
    pthread_create(&loader, nil, (void *(*)(void *)) loadData, &arg);
    pthread_join(loader, nil);
    pthread_join(drawer, nil);
    return 0;
}

Видно, что печать происходит некорректно и после progress 8 печатает одно и тоже. Тем самым вызывая бесконечный цикл. Функция pthread_cancel прерывает исполнение потока с заданным дескриптором. А функция pthread_self возвращает дескриптор потока, в котором она вызвана. Изменим цикл draw для завершения по ошибке.

void drawProgress(void *arg) {
    ThreadArg *argument = arg;
    while(argument->progress < 100) {
        printf("Progress %lu \n", argument->progress);
        sleep(1);
        if(argument->progress == 7) { // cимуляция ошибки
            argument->errorCode = -1;
            pthread_cancel(pthread_self()); // самозавершение
        }
    }
}

Но, увы, вывод программы похож на что-то подобное:

Progress 0
Progress 1
Progress 3
Progress 4
Progress 4
Progress 5
Progress 6
Progress 8

Видно, что прогресс отображается не плавно. Для плавности отображения нам нужно как-то синхронизировать потоки, для того, чтобы сначала один изменял переменную, а второй ее рисовал. Добавим немного condition variables.

pthread_mutex_t conditionMutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t condition = PTHREAD_COND_INITIALIZER;
void drawProgress(void *arg) {
    ThreadArg *argument = arg;
    while(argument->progress < 10) {
        pthread_mutex_lock(&conditionMutex); 
        pthread_cond_wait(&condition, &conditionMutex);
        printf("Progress %lu \n", argument->progress);
        pthread_mutex_unlock(&conditionMutex); 
    }
}
void loadData(void *arg) {
    ThreadArg *argument = arg;
    while(argument->progress < 10) {
        ++argument->progress;
        pthread_cond_signal(&condition);
        sleep(1);
    }
}

Функция pthread_cond_wait ждет, пока кто-либо вызовет pthread_cond_signal для текущего condition. Т.е. это элементарный способ для синхронизации между потоками, но будьте осторожны с pthread_cond_wait одного condtion для разных потоков, это может вызвать dead lock. Именно поэтому они всегда ходят парами с мьютексом.

Для рассмотрения мьютекса давайте усложним наш пример. Допустим, loader-ов несколько и каждый теперь складывает ресурсы в общий мешок. Т.е. есть массив и переменная размера. Суть проблемы в том, что без синхронизации потоки могут положить ресурсы в одну и ту же ячейку, затирая данные друг друга и/или пропуская ячейки без данных.

Например, код далее будет работать некорректно:

#define threadsCount 3
#define arraySize 15
typedef struct {
    size_t size;
    size_t data[arraySize];
    pthread_mutex_t mutex;
} ThreadArray;
typedef struct {
    ThreadArray *array;
    size_t ThreadId;
} ThreadArgument;
void loadData(void *arg) {
    ThreadArgument *arg1 = arg;
    ThreadArray *array = arg1->array;
    size_t countToLoad = arraySize / threadsCount;
    while(countToLoad) {
//        pthread_mutex_lock(&array->mutex);
        array->data[array->size] = arg1->ThreadId;
        ++array->size;
//        pthread_mutex_unlock(&array->mutex);
        --countToLoad;
        sleep(1);
    }
    free(arg1);
}
int main(int argc, const char *argv[]) {
    pthread_t loaders[threadsCount];
    size_t iterator;
    ThreadArray *array = malloc(sizeof(ThreadArray));
//    pthread_mutexattr_t attributes;
//    pthread_mutexattr_init(&attributes);
//    pthread_mutexattr_settype(&attributes, PTHREAD_MUTEX_NORMAL);
//    pthread_mutex_init(&array->mutex, &attributes);
    array->size = 0;
    for(iterator = 0; iterator < threadsCount; ++iterator) {
        ThreadArgument *arg = malloc(sizeof(ThreadArgument));
        arg->array = array;
        arg->ThreadId = iterator + 1;
        pthread_create(&loaders[iterator], nil, (void *(*)(void *)) loadData, arg);
    }
    for(iterator = 0; iterator < threadsCount; ++iterator) {
        pthread_join(loaders[iterator], nil);
    }
    for(iterator = 0; iterator < arraySize; ++iterator) {
        printf("%lu ", array->data[iterator]);
    }
    free(array);
    return 0;
}

Так как в выводе у нас будут 0, то код некорректен. Добавим мьютексов, раскоментировав строки.

Любой мьютекс бывает трех типов – normal, recursive, error check. Нормальный мьютекс можно захватить один раз в одном потоке, повторный захват вызовет ошибку или dead lock. Рекурсивный мьютекс можно захватить n раз, но потом прийдется его освободить n раз (если не освободить n раз – dead lock), он работает медленнее, т.к. по устройству он сложнее. Error checking mutex содержит несколько дополнительных кодов ошибок.

Дело в том, что pthread_mutex_lock мгновенно захватывает мьютек, если он не захвачен, если же захвачен, то эта функция ждет его освобождения. Т.е. если три потока захватывают один мьютекс, какой то из-них их захватывают, а остальные ждут, пока тот, кто его захватил, освободит с помощью pthread_mutex_unlock.

Весь код внутри этих двух функций называют критической секцией. Т.е. кодом, исполнение которого возможно лишь одним потоком.

Обратим внимание на то, что мьютекс входит в структуру самого массива, это значит, что именно один и тот же массив нельзя использовать несколькими потоками, если же это разные массивы ( с разными мьютексами), то такое использование будет разрешено.

Обычно данного базиса хватает для большинства многопоточных приложений, но если возникнут какие-либо предложения или материал для дополнительного рассмотрения – пишите.

P.S. На написание этого топика мне сподвигло добавление многопоточности в libRay и почти все контейнеры там стали опционально потоко-безопасными.

Чудесные материалы по данной теме (на англ языке)


computing.llnl.gov/tutorials/pthreads/#ConVarSignal
maxim.int.ru/bookshelf/PthreadsProgram/htm/r_28.html
docs.oracle.com/cd/E19455-01/806-5257/6je9h032r/index.html