Powered By Blogger

Tuesday, April 27, 2010

Ещё немного о потоках ядра: kthreadd

ещё буквально пара слов о потоках ядра. Вероятно, Вы замечали в выводе ps -ef поток ядра kthreadd. Наверняка, у Вас даже возникал вопрос, для чего он нужен? На самом деле, всё достаточно просто. Опосредованно взаимодействуя с помощью определённых API с данным потоком, различные части ядра могут ставить в очередь на создание новые потоки, которые и создаёт kthreadd. Данные API ядра используются наряду с функцией kernel_thread(), с тем отличием, что создание нового процесса происходит не сразу же. Сам поток kthreadd стартует после инициализации основного потока ядра в функции rest_init() init/main.c:

static noinline void __init_refok rest_init(void)
        __releases(kernel_lock)
{
        int pid;

        rcu_scheduler_starting();
        kernel_thread(kernel_init, NULL, CLONE_FS | CLONE_SIGHAND);
        numa_default_policy();
        pid = kernel_thread(kthreadd, NULL, CLONE_FS | CLONE_FILES);
        kthreadd_task = find_task_by_pid_ns(pid, &init_pid_ns);
        unlock_kernel();

        /*
        * The boot idle thread must execute schedule()
        * at least once to get things moving:
        */
        init_idle_bootup_task(current);
        preempt_enable_no_resched();
        schedule();
        preempt_disable();

        /* Call into cpu_idle with preempt disabled */
        cpu_idle();
}
Описатель задачи (struct task_struct) kthreadd хранится в переменной ядра kthreadd_task. Заметьте, что функция kernel_thread() возвращает идентификатор нового процесса (потока). Для того, чтобы получить описатель задачи, в ядре, в частности в приведённом коде, используется функция find_task_by_pid_ns(), первый аргумент которой - идентификатор потока, чей описатель задачи нам нужен, а второй - пространство идентификаторов процесса-предка (в данном случае - init).

Функция потока kthreadd реализована в kernel/kthread.c:

int kthreadd(void *unused)
{
        struct task_struct *tsk = current;

        /* Setup a clean context for our children to inherit. */
        set_task_comm(tsk, "kthreadd");
        ignore_signals(tsk);
        set_cpus_allowed_ptr(tsk, cpu_all_mask);
        set_mems_allowed(node_possible_map);

        current->flags |= PF_NOFREEZE | PF_FREEZER_NOSIG;

        for (;;) {
                set_current_state(TASK_INTERRUPTIBLE);
                if (list_empty(&kthread_create_list))
                        schedule();
                __set_current_state(TASK_RUNNING);

                spin_lock(&kthread_create_lock);
                while (!list_empty(&kthread_create_list)) {
                        struct kthread_create_info *create;

                        create = list_entry(kthread_create_list.next,
                                struct kthread_create_info, list);
                        list_del_init(&create->list);
                        spin_unlock(&kthread_create_lock);

                        create_kthread(create);

                        spin_lock(&kthread_create_lock);
                }
                spin_unlock(&kthread_create_lock);
        }

        return 0;
}
Если не вдаваться сейчас во все детали, то из кода видно, что kthreadd() "крутится" в вечном цикле, в начале каждого прохода проверяя состояние списка kthread_create_list. Если список пуст, то поток устанавливает своё состояние как "спящий" и отдаёт управление, вызывая функцию планировщика schedule(). Если в списке есть элементы, то поток взводит спин-блокировку на списке, чтобы обезопасить его от изменений и до тех пор, пока список не пуст последовательно выполняет следующие действия:
  1. в переменную create типа struct create_thread_nfo* получаем элемент списка;
  2. удаляем элемент из списка;
  3. снимаем спин-лок со списка, так что теперь в него снова можно добавлять новые элементы извне;
  4. используя данные, находящиеся по адресу, сохранённому в create, создаём новый поток с помощью вспомогательной функции create_kthread() (не путать с kthread_create() и kernel_thread()! в отличие от них, create_kthread() не экспортируется за пределы kthread.o);
  5. ну и наконец снова взводим спин-лок на списке, чтобы не произошло ничего неожиданного, пока мы будем проверять пуст ли список потоков к созданию :)

Очередь потоков к созданию - это нечто иное, как двусвязный список, о ктором я уже писал. Вот как выглядит структура-элемент этого списка:

struct kthread_create_info
{
        /* Information passed to kthread() from kthreadd. */
        int (*threadfn)(void *data);
        void *data;

        /* Result passed back to kthread_create() from kthreadd. */
        struct task_struct *result;
        struct completion done;

        struct list_head list;
};
Самые интересные на данный момент поля здесь - это threadfn - указатель на функцию, которая должна выполняться в отдельном потоке, data - указатель на данные, которые будут использоваться потоком, result - указатель на описатель задачи для нового потока.

В список новые элементы добавляются с помощью функции kthread_create():

/**
* kthread_create - create a kthread.
* @threadfn: the function to run until signal_pending(current).
* @data: data ptr for @threadfn.
* @namefmt: printf-style name for the thread.
*
* Description: This helper function creates and names a kernel
* thread.  The thread will be stopped: use wake_up_process() to start
* it.  See also kthread_run(), kthread_create_on_cpu().
*
* When woken, the thread will run @threadfn() with @data as its
* argument. @threadfn() can either call do_exit() directly if it is a
* standalone thread for which noone will call kthread_stop(), or
* return when 'kthread_should_stop()' is true (which means
* kthread_stop() has been called).  The return value should be zero
* or a negative error number; it will be passed to kthread_stop().
*
* Returns a task_struct or ERR_PTR(-ENOMEM).
*/
struct task_struct *kthread_create(int (*threadfn)(void *data),
        void *data,
        const char namefmt[],
        ...)
{
        struct kthread_create_info create;

        create.threadfn = threadfn;
        create.data = data;
        init_completion(&create.done);

        spin_lock(&kthread_create_lock);
        list_add_tail(&create.list, &kthread_create_list);
        spin_unlock(&kthread_create_lock);

        wake_up_process(kthreadd_task);
        wait_for_completion(&create.done);

        if (!IS_ERR(create.result)) {
                struct sched_param param = { .sched_priority = 0 };
                va_list args;

                va_start(args, namefmt);
                vsnprintf(create.result->comm, sizeof(create.result->comm),
                        namefmt, args);
                va_end(args);
                /*
                 * root may have changed our (kthreadd's) priority or CPU mask.
                 * The kernel thread should not inherit these properties.
                 */
                sched_setscheduler_nocheck(create.result, SCHED_NORMAL, ¶m);
                set_cpus_allowed_ptr(create.result, cpu_all_mask);
        }
        return create.result;
}

EXPORT_SYMBOL(kthread_create);
kthread_create() принимает указатель на функцию, которая должна выполняться в отдельном потоке (threadfn), указатель на данные для потока (data) и имя нового потока (namefmt). Сперва kthread_create() инициализирует поля переменной create типа struct kthread_create_info. Затем на время добавления нового элемента в список потоков, "ждущих" создания, kthread_create() взводит спин-лок, чтобы никто больше не мог добавить новые элементы и внести сумятицу в наши дела :) Новый элемент списка добавляется в хвост с помощью макроса list_add_tail(). Затем спин-лок снимается - список снова свободен. Далее, kthread_create() будит поток kthreadd с помощью wake_up_process(), который должен будет проверить очередь и запустить новый поток, как описывалось выше. Ну и наконец, если при создании нового потока не возникло ошибок, то подготавливаем такие реквизиты нового потока, как имя и параметры планирования. Оставив новый поток в состоянии сна, возвращаем управления. Вот и всё, что делает kthread_create(). Если кратко, то она ставит в очередь новый запрос на создание потока, дожидается, пока не отработает рабочий поток kthreadd и не будет создан новый спящий поток.

На этом матрёшка не заканчивается. В упомянутой функции kthreadd() мы сознательно пропустили одно место:

        spin_unlock(&kthread_create_lock);
        create_kthread(create);
        spin_lock(&kthread_create_lock);
create_kthread() - ещё одна вспомогательная внутренняя функция, которая с помощью уже знакомого нам вызова kernel_thread() создаёт реальный поток. Но, не всё так просто. На самом деле, здесь создаётся не тот поток, который указывался в качестве аргумента threadfn для kthread_create()! Создаётся всего лишь новый поток kthread - опять же, внутренняя неэкспортируемая за пределы единицы трансляции функция :)
static void create_kthread(struct kthread_create_info *create)
{
        int pid;

        /* We want our own signal handler (we take no signals by default). */
        pid = kernel_thread(kthread, create, CLONE_FS | CLONE_FILES | SIGCHLD);
        if (pid < 0) {
                create->result = ERR_PTR(pid);
                complete(&create->done);
        }
}
По результату, который будет положительным числом - идентификатором процесса (pid) в сучае успеха или отрицательным - код ошибки, узнаём, как всё прошло.

Что же делает kthread()? Не так и много. Сначала, новый поток копирует все необходимые данные из переданного ему аргумента _create - т.е., адрес функции потока (threadfn) и данные для потока (data). Во внутреннюю пемеременную self типа struct kthread записываем "состояние" потока - should_stop - не 0, если поток должен быть остановлен и exited - код возврата, если поток завершился.

static int kthread(void *_create)
{
        /* Copy data: it's on kthread's stack */
        struct kthread_create_info *create = _create;
        int (*threadfn)(void *data) = create->threadfn;
        void *data = create->data;
        struct kthread self;
        int ret;

        self.should_stop = 0;
        init_completion(&self.exited);
        current->vfork_done = &self.exited;

        /* OK, tell user we're spawned, wait for stop or wakeup */
        __set_current_state(TASK_UNINTERRUPTIBLE);
        create->result = current;
        complete(&create->done);
        schedule();

        ret = -EINTR;
        if (!self.should_stop)
                ret = threadfn(data);

        /* we can't just return, we must preserve "self" on stack */
        do_exit(ret);
}
Здесь:
        /* OK, tell user we're spawned, wait for stop or wakeup */
        __set_current_state(TASK_UNINTERRUPTIBLE);
        create->result = current;
        complete(&create->done);
        schedule();
мы устанавливаем состояние потока в TASK_UNINTERRUPTIBLE (спящий процесс). В описатель задачи - result - записываем указатель на текущий контекст (ведь когда kthread была запущена через kernel_thread(), у нас уже свой контекст выполнения для данного экземпляра kthread()). Далее, сигнализируем о завершении инициализации описателя нового процесса и состояния задачи с помощью complete() (здесь я намеренно пока не углубляюсь в то, что такое атомарное ожидание). Просим ядро выполнить перепланирование процессов. В этом месте, по сути, выполнение нашего нового потока приостанавливается, т.к. планировщик не будет выделять ему процессорное время ввиду того, что поток спит... Следущие строки будут выполнены только после того, как поток будет разбужен:
         ret = -EINTR;
        if (!self.should_stop)
                ret = threadfn(data);

        /* we can't just return, we must preserve "self" on stack */
        do_exit(ret);
Тут, как будто, ничего мистического нет. Сразу, как только поток вновь получит процессор в своё владение (будет кем-то разбужен), в переменную-код возврата мы записываем код ошибки EINTR - "процесс прерван". Затем необходимо проверить, не успел ли кто-то отменить выполнение потока. Если нет, то наконец-то выполняем именно нашу функцию - threadfn(), передавая ей в качестве аргумента данные для работы. Ну а после этого - делаем do_exit() после того, как функция threadfn() возратит управление.

Такова подсистема поочередного запуска потоков в общих чертах. Всё остальное достаточно просто:

  • void kthread_bind(struct task_struct *k, unsigned int cpu) - создать поток, привязанный к конкретному процессору;
  • int kthread_stop(struct task_struct *k) - запросить останов потока;
  • int kthread_should_stop(void) - проверка, был ли запрошен останов потока (удобно использовать внутри самого потока);
  • kthread_run(threadfn, data, namefmt, ...) - макрос, который делает то же, что kthread_create() с той лишь разницей, что поток сразу будет пробуждён.

Monday, April 12, 2010

Linux - Потоки ядра

Когда-то всё повторится с вероятностью 99%,
но в других обстоятельствах, уже с другими людьми,
по другому поводу, но с той же целью –
стукнуть человека граблями по лбу,
чтобы не забылся человек. ©

Пожалуй, пришло время обсудить ещё одну интересную тему, касающуюся ядра Linux. И тема эта – потоки ядра. Не станем тянуть кота за хвост, а сразу перейдём к делу. Во-первых, выясним для себя, что такое поток? Для того, чтобы ответить на этот вопрос, зададимся другим вопросом, связанным с первым. Что такое процесс? В современных мультизадачных операционных системах есть возможность запустить несколько приложений, которые будут работать какбы параллельно, не мешая друг другу. Можно, допустим, запустить плейер, слушать музыку и одновременно набивать текст в редакторе. Что-то вроде того, что делаю я сейчас :) Но само приложение, это, грубо говоря, всего лишь образ на диске, файл, который необходимо открыть и запустить программу действий, хранящихся в нём. Такие действия есть ничто иное, как инструкции процессора. Но это ещё не процесс. Процесс – некая сущность, абстрактная по своей природе, которой оперирует операционная система. Если посмотреть на классические определения, то процесс – это совокупность кода (инструкция процессора) и ресурсов, выделенных во владение данному коду. Ведь инструкции должны в свою очередь также чем-то оперировать, верно? Где-то необходимо хранить промежуточные и конечные результаты работы. По обстоятельствам, хранить их можно либо на диске (файлы), либо в оперативной памяти. В данной случае блоки памяти, выделенные в распоряжение коду и дескрипторы файлов (то, посредством чего процесс ссылается на объекты ядра - файлы) – это и есть ресурсы. Разумеется, всё многообразие ресурсов, которыми может владеть процесс, не исчерпывается памятью и файловыми дескрипторами. Оставим в покое такие тривиальные вещи, как память и файлы и добавим, что для процесса, т.е. выполняющегося кода, есть ещё один крайне важный ресурс – процессорное время. Каждый процесс имеет в своём распоряжении определённый квант времени, на протяжении которого он волен делать с процессором всё (ну, или точнее, почти всё), что угодно, в том числе, он может отказаться от своего кванта и отдать его другому процессу. Тот, кто занимается выделением квантов времени процессам, управляет очередью, в которой за временем стоят процессы и непосредственно ведает величиной кванта времени, который будет выделен тому или иному процессу – это ядро операционной системы. Если быть точнее, подсистема ядра, занимающаяся планированием процессов или попросту – планировщик (scheduler). Процессы, таким образом, работают отнюдь не одновременно и параллельно (для простоты отвлечёмся от того факта, что в последнее время широкое распространение получили SMP-системы, основанные на многоядерных процессорах, где истинный параллелизм выполнения кода действительно возможен). Вместо этого планировщик последовательно переключает процессы, давая возможность каждому из них на какое-то время воспользоваться центральным процессором в своих целях. Так как происходит это достаточно быстро, то создаётся иллюзия, что несколько процессов работают одновременно. Таким образом, процесс – это логическая сущность, нечто, что в данный момент выполняется. Однако, это не предельная сущность. В современных операционных системах почти повсеместно имеет место ещё одна сущность – поток. Поток, это приближённо говоря часть процесса, выполняющаяся параллельно другим его частям. Если мы возьмём такой классический пример, как GUI-приложение, то один поток в таком процессе может, допустим, отрисовывать пользовательский интерфейс, в то время, как второй поток будет заниматься действительно полезной работой :) Взаимодействуя друг с другом в таком процессе можно обеспечить актуальное состояние графического интерфейса, отражающего прогресс выполнения задачи – ну хоть отрисовывать прогресс-бар, к примеру :) В терминах многопоточного программирования у приложения всегда есть как минимум один поток – главный. Главный поток в процессе своей деятельности может порождать вторичные потоки. В зависимости от платформы поддержка потоков может быть реализована на уровне ядра операционной системы, либо сугубо средствами библиотек. Если поддержка процессов ядром неизбежна, то, как было сказано, потоки вовсе не обязаны поддерживаться данной ОС. Потоки очень похожи на процессы, но есть одно весьма важное отличие. Каждый процесс владеет своим набором ресурсов и ресурсы эти недоступны другим процессам, выполняющимся в системе. Так должно быть в идеале и так есть, за исключением тех случаев, когда приложения сами заинтересованы в разделении своих ресурсов (межпроцессное взаимодействие - IPC). С потоками дело обстоит иначе. Все потоки данного процесса разделяют общие ресурсы, которые имеются в распоряжении данного процесса – память, дескрипторы файлов, сокетов, и проч. После небольшого вводного курса обратимся к объекту наших изысканий – ядру Linux. Изначально в ядре Linux не было поддержки потоков. Вместо этого потоки реализовывались средствами стандартной библиотеки POSIX – pthread. С одной стороны, такой подход хорош тем, что устраняется зависимость приложения от конкретного ядра, а с другой, отсутствие поддержки со стороны ядра сильно усложняет реализацию самой библиотеки, ведь необходимо как-то обеспечить хитроумные механизмы выполнения потоков, межпоточной синхронизации и проч. Позже ситуация изменилась и в ядре появилась концепция так называемых облегчённых процессов (lightweight process). В отличие от ядра NT (то, которое Windows) в ядре Linux нет выделенной концепции потока. В качестве потока как единица планирования на уровне ядра выступает упомянутый облегчённый процесс. Если не вдаваться в детали, то облегчённый процесс подобен обычному, за исключением того, что ресурсы облегчённых процессов разделяются между собой. Т.о., облегчённый процесс и есть интерпретация концепции потока в рамках ядра Linux. Благодаря введению облегчённых процессов многие вещи упростились, т.к. теперь ядро способствует процессам в распараллеливании задач. Теперь упростилась реализация пользовательских библиотек для поддержки многопоточности, можно спокойно использовать объекты блокировки ядра, создавать пулы потоков и др. Теперь мы подошли к тому, чтобы ответить, что из себя представляют потоки ядра, чем они отличаются от пользовательских потоков, кому и зачем всё это вообще нужно? В принципе, потоки ядра очень похожи на пользовательские процессы за исключением того обстоятельства, что свои данные потоки ядра хранят в памяти самого ядра и, соответственно, имеют доступ к виртуальному адресному пространству ядра, они т.о. имеют доступ к функциям ядра и его структурам данных. Поток для ядра – это возможность запустить на фоне определённую задачу, которая будет выполняться не непрерывно, а, допустим, ожидая наступления какого-то события, будет «спать», т.е., асинхронно. Как и в случае с процессами, потоки какое-то время способны владеть процессом, пока они не будут вытеснены другим потоком, что имеет место в случае т.н. вытесняющей многозадачности. Ниже мы рассмотрим, как используются потоки ядра и что они делают на конкретных примерах и с привлечением сведений о том, что такое очереди выполнения, ожидания и состояния процесса.
Для начала посмотрим, что есть в нашей системе на базе ядра Linux ветки 2.6. Сделать это просто, если запустить ps с ключами “e” и “f”:
$ ps –ef

UID        PID  PPID  C STIME TTY          TIME CMD
root         1     0  0 22:36 ?        00:00:00 init [3]
root         2     1  0 22:36 ?        00:00:00 [ksoftirqd/0]
root         3     1  0 22:36 ?        00:00:00 [events/0]
root        38     3  0 22:36 ?        00:00:00 [pdflush]
root        39     3  0 22:36 ?        00:00:00 [pdflush]

…
root      4066  4015  0 22:59 tty3     00:00:00 ps -ef
В приведённом листинге потоки ядра это те, чьи имена взяты в квадратные скобки. Допустим, поток ядра ksoftirqd занимается обеспечением системы обработки мягких прерываний по IRQ. Суть идеи состоит в том, что как известно, в ядре Linux реализована следующая модель прерываний. Обработчик прерывания имеет две «половины» - верхнюю и нижнюю. Верхняя половина прерывания требует сиюминутной обработки, в то время, как обработка нижней половины может быть отложена и её ядро не обязано обработать мгновенно. Откладывая т.о. выполнение части кода, отвечающей за обработку прерываний, система экономит время, которое может быть отдано под выполнение более приоритетных на данный момент задач. В функции потока ksoftirqd входит наблюдение за тем, чтобы все запросы на прерывания были обслужены с одной стороны, и с другой, чтобы система не загружалась до предела такими запросами. Номер после слэша – это число, идентифицирующее поток, т.к. в SMP-системах на каждом процессоре выполняется свой экземпляр потока ksoftirqd. Если в Вашей системе 4 процессора, либо 4-соответственно в системе будут 4 потока ksoftirqd – ksoftirqd/0-ksoftirqd/3, которые будут привязаны каждый к своему процессору или процессорному ядру. Ещё один поток ядра – events, обеспечивает централизованную поддержку рабочих очередей. Если какая-то часть ядра желает отложить выполнение работы, то эта часть может либо создать собственную рабочую очередь, либо поставить задание в очередь, созданную потоком events. Ещё один пример того, как ядро создаёт потоки для фоновых задач – pdflush. Так как известно, что по быстродействию память неизмеримо лучше, чем диск, то в целях соблюдения баланса производительности данные, предназначенные для записи на накопитель не попадают туда мгновенно. Вместо этого данные по запросам записи накапливаются в кэше. pdflush как раз и занимается синхронизацией буферного кэша с содержимым диска. Поток pdflush сбрасывает на диск т.н. «грязные» страницы кэша, т.е. те, в которые производилась запись. При этом pdflush следит, чтобы объём свободной памяти не слишком понижался и чтобы страницы с данными не находились в кэше больше определённого времени. Если объём доступной памяти уменьшается, а в кэше находятся данные, ожидающие записи или если некоторые страницы задержались, то pdflush обеспечивает запись кэшированных данных на диск. Из ранее приведённого листинга видно, что в системе есть 2 экземпляра pdflush, но без номеров, которые бы указывали на привязку к процессорам. На самом деле, новые экземпляры потока pdflush создаются по мере роста нагрузки на систему ввода/вывода, либо когда один поток сильно загружен. khubd – часть подсистемы USB ядра, которая отслеживает подключение к USB-хабу устройств и занимается конфигурированием устройств, поддерживающий горячее подключение (hot-plugged). Наконец, kjournald – это поток журналирования, используемый ядром в реализация файловых систем с поддержкой журналирования, как например ext3.
Для того, чтобы лучше понять суть сказанного, попробуем реализовать свой собственный поток ядра и начнём с постановки задачи: допустим, нам нужен такой поток, который бы вёл наблюдение за состоянием ядра и асинхронно с помощью вспомогательной пользовательской программы уведомлял Вас, что определённые структуры ядра находятся в состоянии, не вполне хорошем или неудовлетворительном. Как например ситуация, когда свободное место в принимающем буфере сетевого драйвера почти иссякло. Чтобы решить поставленную задачу необходимо следующее:
  1. код должен быть оформлен, как фоновое задание, которое ожидает наступления некого асинхронного события;
  2. наш код должен иметь доступ к структурам данных ядра, ведь определение критического уровня буфера выполняется другими частями ядра;
  3. наш код должен вызывать вспомогательное пользовательское приложение, что затратно в плане времени.

Наш поток освобождает процессор и пробуждается только тогда, когда наступает ожидаемое событие, например, изменение в структуре, за которой мы ведём наблюдение. Пробудившись, поток должен вызвать вспомогательное приложение, которое и обязано уведомить пользователя.
Приведённый далее вызов создаёт поток ядра:
ret = kernel_thread (mykthread, NULL, CLONE_FS | CLONE_FILES | CLONE_SIGHAND | SIGCHLD);
Наш поток можно создать там, где это наиболее подходит. Ну, хотя бы в init/main.c. Набор флагов, передаваемый функции kernel_thread(), определяет, какие ресурсы должны разделяться между родителем и его потоками-потомками.
CLONE_SIGHAND – предписывает сделать общими обработчики сигналов Unix;
CLONE_FILES – разделять дескрипторы открытых файлов;
CLONE_FS – родитель и потомок используют общую информацию о файловой системе. Сюда входит корневой каталог, текущий каталог и параметр umask процесса. Любой вызов chroot(2), chdir(2), umask(2), выполненный либо потомком, либо родителем влияет также и на другой процесс.
В коде, приводимом далее, функция ядра daemonize() создаёт поток без ассоциированных с ним пользовательских ресурсов. Следующий вызов reparent_to_init() изменяет родителя взывающего потока на поток init. Каждый поток (равно, как и процесс) в Linux (да и не только в нём) имеет единственного родителя. В случае, когда родительский процесс завершается, не дождавшись окончания работы своего дочернего потока (процесса), такой осиротевший процесс становится зомби. Проще говоря, процесс-зомби уже не планируется на выполнение, но в то же время запись о процессе, поставленная ему в соответствие и описывающая его не удаляется. Вся информация сохраняется, хотя, по существу, процесс уже завершён и больше никому не нужен. Переназначение процесса-родителя позволяет избежать таких неприятностей. В ядрах ветки 2.6 функция daemonize() сама вызывает reparent_to_init(). Так как вызов daemonize() по умолчанию блокирует все сигналы, нам необходимо вызвать функцию allow_signal(), чтобы указать, какие сигналы можно доставлять нашему потоку. Т.к. в ядре нет обработчиков сигналов, как например, в glibc, необходимо использовать специальную функцию signal_pending(), с помощью которой можно определить, был ли потоку послан сигнал и если да, то обработать его соответственно. В нашем примере мы обрабатываем только SIGKILL, по которому поток завершается.
static DECLARE_WAIT_QUEUE_HEAD (myevent_waitqueue);
rwlock_t myevent_lock;

static int mykthread (void *unused)
{
        unsigned int event_id = 0; 

        DECLARE_WAITQUEUE (wait, current);

        /* Код, необходимый для того, чтобы наш код стал потоком ядра 
        * без каких-либо ресурсов, присущих пользовательским процессам */
        daemonize (”mykthread”);
        reparent_to_init (); /* Для ядер ветки 2.4 */

        /* Запросить доставку сигнала SIGKILL */
        allow_signal (SIGKILL);

        /* Поток спит в очереди ожидания, пока он не будет разбужен 
        * той частью ядра, которая непосредственно имеет дело с интересующей
        * нас структурой данных */
        add_wait_queue (&myevent_waitqueue, &wait);

        for (;;) {
                /* Освободить процессор, пока не наступит ожидаемое событие */
                set_current_stat (TASK_INTERRUPTIBLE);
                schedule ();

                /* Выход, если получен сигнал SIGKILL */
                if (signal_pending (current)) break;

                /* Сюда переходит управление, когда поток разбужен  */
                read_lock (&myevent_lock); /* Начало критической секции */

                if (myevent_id) { /* Защита от "хвостовых" (повторных) пробуждений */
                        event_id = myevent_id;
                        read_unlock (&myevent_lock); /* Здесь заканчивается критическая секция */

                        /* Вызвать зарегистрированное пользовательское приложение-хелпер
                        * передав ему через переменные окружения необходимую информацию */
                        run_umode_handler (event_id); /* См. далее */
                } else {
                        read_unlock (&myevent_lock); 
                }
        }

        set_current_state (TASK_RUNNING);
        remove_wait_queue (&myevent_waitqueue, &wait);
        return 0;
}

Если Вы откомпилируете приведённый код в составе ядра, то при загрузке с новым ядром по команде ps сможете увидеть свой новый поток, который выполняется, как потомок init:
$ ps –ef

UID        PID  PPID  C STIME TTY          TIME CMD
root         1     0  0 21:56 ?        00:00:00 init [3]
root         2     1  0 22:36 ?        00:00:00 [ksoftirqd/0]
…
root         111   1  0 21:56 ?        00:00:00 [mykthread]
…

Прежде чем снова углубляться в детали реализации потоков ядра, взглянем на код ниже:
/* Выполняется теми частями ядра, которые владеют необходимыми данными, которые мы мониторим */

        /* … */

        if (my_key_datastructure looks troubled) {
                write_lock (&myevent_lock); 
                /* Записываем наблюдаемую структуру */
                myevent_id = datastructure_id;
                write_unlock (&myevent_lock);

                /* Разбудить поток mykthread */
                wake_up_interruptible (&myevent_waitqueue); 
        }

        /* … */
Полезную работу ядро выполняет используя контекст процесса или контекст прерывания. Причём контекст процесса и контекст прерывания не связаны друг с другом. Чтобы не вносить путаницу, необходимо вероятно пояснить, что когда речь идёт о контексте прерывания, то подразумевается не обязательно сам факт прерывания а всего лишь контекст, в котором ядро выполняет отложенные вызовы. Самый первый кусок кода нашей реализации потока выполняется в контексте потока. В то время как код, приведённый абзацем ранее, использует контекст прерывания для обработки отложенной функции и в равной же степени он может быть использован в контексте процесса. Оба вида контекстов связываются посредством структур данных ядра. myevent_id и myevent_waitqueue в приведённом нами коде используются для связывания контекстов. Доступ к переменной myevent_id управляется спин-блокировкой (spin lock). Потоки ядра выполняются в режиме вытесняющей многозадачности только если при конфигурировании ядра включен параметр CONFIG_PREEMPT. Без включения этого параметра и патча вытесняющей многозадачности в ядрах ветки 2.4 наш поток заморозит всю систему, если он сам не заснёт. Если в первом куске кода, приводимом в начале этой статьи, закомментировать вызов schedule(), то с выключенным параметром CONFIG_PREEMPT ядро также окажется заблокированным.
Давайте ещё раз посмотрим на код, который усыпляет наш поток mykthread до наступления события.
        add_wait_queue (&myevent_waitqueue, &wait);

        for (;;) {
                /* .. */
                set_current_state (TASK_INTERRUPTIBLE);
                schedule ();
                /* Точка A */

                /* .. */
        }

        set_current_state (TASK_RUNNING);
        remove_wait_queue (&myevent_waitqueue, &wait);
В очередях ожидания хранятся записи потоков, которые ожидают наступления события или системного ресурса. Поток в очереди ожидания спит до тех пор, пока он не будет разбужен обработчиком прерывания или другим потоком, который ведёт наблюдение за каким-либо объектом. Постановка в очередь и удаление из неё осуществляются с помощью функций add_wait_queue() и remove_wait_queue() соответственно. Пробуждение поставленного в очередь потока/процесса выполняется функцией wake_up_interruptible(). В приведённом выше кусочке кода set_current_state() необходима для того, чтобы установить состояние потока «выполняется».
Поток ядра (равно, как и обычный процесс) может находиться в одном из следующих состояний:
  • TASK_RUNNING: Процесс выполняется (использует процессор) или находится в очереди выполнения, ожидая выделения процессорного времени.
  • TASK_INTERRUPTIBLE: Процесс приостановлен до наступления определенного события. Это состояние может быть прервано сигналами. После получения сигнала или возобновления путем явного выполнения "пробуждающего" вызова процесс переходит в состояние TASK_RUNNING.
  • TASK_UNINTERRUPTIBLE: Данное состояние аналогично TASK_INTERRUPTIBLE, с той лишь разницей, что в нем не происходит обработка сигналов. Прерывание процесса, находящегося в этом состоянии, может быть нежелательным, поскольку ожидание может быть частью некоторой важной задачи. При наступлении ожидаемого события процесс возобновляется путем явного выполнения "пробуждающего" вызова.
  • TASK_STOPPED: Выполнение процесса остановлено, он не выполняется и не может начать выполняться. Процесс переходит в это состояние по получении таких сигналов, как SIGSTOP, SIGTSTP и т.д. Процесс сможет снова стать исполняемым после получения сигнала SIGCONT.
  • TASK_TRACED: Процесс находится в этом состоянии при выполнении его мониторинга такими процессами, как, например, отладчики.
  • EXIT_ZOMBIE: Процесс завершен. Он будет находиться в системе до момента получения родительским процессом статистической информации по его выполнению.
  • EXIT_DEAD: Конечное состояние (соответствующее своему названию). Процесс переходит в это состояние при его удалении из системы после того, как родительский процесс получит всю статистическую информацию, выполнив системный вызов wait4() или waitpid().
Кстати, начиная с версии ядра 2.6.25 введено новое состояние процесса – TASK_KILLABLE если процесс приостановлен с приоритетом, допускающим прерывания, он ведет себя так, как если бы находился в состоянии TASK_UNINTERRUPTIBLE, но вдобавок имеет возможность обрабатывать фатальные сигналы.
Вернёмся к нашему потоку. mykthread спит, находясь в очереди ожидания (myevent_waitqueue) и изменяет своё состояние на TASK_INTERRUPTIBLE, давая понять, что он отказывается от ожидания в очереди выполнения и, стало быть, квантов времени, которые ему может выделить планировщик выполнения процессов. Вызов функции schedule() необходим для того, чтобы планировщик выбрал другой процесс из очереди выполнения. Когда другая часть ядра будит mykthread с помощью wake_up_interruptible(), поток помещается обратно в очередь выполнения планировщика, а состояние задачи меняется на TASK_RUNNING, т.о. ситуация гонок отсутствует даже в том случае, когда процесс пробуждается, находясь в состоянии TASK_INTERRUPTIBLE (к слову: механизм пробуждения процесса из очереди ожидания, а точнее, механизм самой очереди ожидания предотвращает ситуацию, известную, как "стадо перед грозой", когда несколько процессов ждут одного события - например, освобождения ресурса. Когда ядро будит их, при эсклюзивном доступе к ресурсу получается, что большинство процессов пробуждаются лишь для того, чтобы поучаствовать в гонке за ресурсом и не солоно хлебавши продолжить спать - таких процессов из чсла разбуженных будет большинство, ведь победитель возможен только один) и происходит вызов функции планирования - schedule(). Поток перемещается также в очередь выполнения, если ему доставлен сигнал SIGKILL. Когда планировщик в порядке очерёдности извлекает поток mykthread из очереди выполнения, выполнение потока продолжается в точке А.
Теперь, для полной реализации задуманного, всё, что нам остаётся сделать, это реализовать вспомогательное приложение, которое будет запускаться потоком ядра и, собственно, уведомлять пользователя о событии. Итак, хорошая новость. Для нашего удобства ядро поддерживает механизм вызова пользовательских приложений, если требуется выполнить какие-то определённые действия. Например, если включена функция автоматической подгрузки модулей, то ядро по необходимости динамически подгружает необходимые модули, используя для этой цели пользовательский загрузчик модулей. По умолчанию, загрузчик модулей - /sbin/modprobe, но ничто не мешает нам заменить его на другой, зарегистрировав альтернативный загрузчик с помощью файла /proc/sys/kernel/modprobe. Аналогичным образом ядро уведомляет пользовательское окружение о событиях, связанных с устройствами, поддерживающими горячее подключение (hot-plug). В данном случае вспомогательное приложение по умолчанию - /sbin/hotplug, которое можно заменить, переписав содержимое файла /proc/sys/kernel/hotplug. В приведённом далее коде реализована функция, с помощью которой поток ядра mykthread уведомляет пользовательское окружение о наступлении события. Приложение, которое необходимо вызвать, регистрируется в виртуальной файловой системе /proc системным вызовом sysctl, если конечно при конфигурировании ядра был разрешён интерфейс sysctl (CONFIG_SYSCTL). Для того, чтобы добавить свой sysctl-параметр ядра, необходимо добавить новый элемент в массив kern_table в файле kernel/sysctl.c:
{
 KERN_MYEVENT_HANDLER, "myevent_handler", &myevent_handler, 256,
 0644, NULL, &proc_dostring, &sysctl_string
}

Благодаря этому при загрузке в модифицированное ядро на файловой системе /proc появится такой файл: /proc/sys/kernel/myevent_handler.
Чтобы зарегистрировать свой обработчик, достаточно в командной строке вполнить следующую команду:
$ echo /path/to/kernel-helper > /proc/sys/kernel/myevent_handler

Т.о., при наступлении ожидаемого события будет выполнен наш /path/to/kernel-helper.
static void run_umode_handler (int event_id)
{
        int i = 0;
        char *argv[2], *envp[4], *buffer = NULL;
        int value;

        argv[i++] = myevent_handler; /* Определено в kernel/sysctl.c */

        /* Запись идентификатора структуры */
        if (!(buffer = kmalloc (32, GFP_KERNEL))) return;
        sprintf (buffer, "TROUBLED_DS=%d", event_id);

        /* Если пользовательские обработчики не зарегистрованы, то возврат управления */
        if (!argv[0]) return;
        argv[i] = 0;

        /* Подготовка окружения для /path/to/kernel-helper */
        i = 0;
        envp[i++] = "HOME=/";
        envp[i++] = "PATH=/sbin:/bin:/usr/sbin:/usr/bin";
        envp[i++] = buffer;
        envp[i] = 0;

        /* Запустить пользовательское приложение, /path/to/kernel-helper */
        value = call_usermodehelper (argv[0], argv, envp, 0);

        /* Check return values */
         …

         kfree (buffer);
}
Информацию о структуре данных ядра, которая по каким-то причинам может вызвать у нас интерес, мы передаём пользовательскому процессу через переменную окружения (TROUBLED_DS). В принципе, вспомогательным приложением может быть даже простой шелл-скрипт, который уведомит пользователя по эл.почте о наступившем событии, при чём информацию о наблюдаемой структуре данных ядра он также сможет спокойно получить из переменной окружения:
#!/bin/bash
echo Kernel datastructure $TROUBLED_DS  
  is in trouble | mail –s Alert root
Функция call_usermodehelper() выполняется в контексте запускаемого процесса и работает с полномочиями пользователя root (root capabilities). В ядрах ветки 2.6 этот механизм реализуется как очередь заданий (work queue).
Вот мы и рассмотрели в довольно общих чертах потоки ядра, зачем они нужны, как создаются, работают и используются. Но здесь мы привели лишь отрывочные фрагменты кода и немного теории. Самый лучший способ изучить всё в деталях – обратиться к исходному коду ядра. Для тех, кого всё описанное заинтересовало, упоминаемые в тексте потоки ядра ksoftirqd, pdflush, и khubd реализованы соответственно в kernel/softirq.c, mm/pdflush.c и drivers/usb/core/hub.c.
Код функции daemonize() можно найти в файле kernel/exit.c для ядер ветки 2.6 и в файле kernel/sched.c, если речь о версии 2.4. Если Вас интересует механизм вызова пользовательских приложений, хелперов ядра, милости просим обратить внимание на код в файле kernel/kmod.c. Вот, пожалуй, и всё. Надеюсь, было интересно и, главное, полезно :)

Ссылки.

[1] Материалы из статьи Sreekrishnan Venkateswaran на http://www.linux-mag.com/ (увы, не помню точно адрес статьи);
[2] "Ядро LINUX", Д.Бовет, М.Чезати, 3-е изд.;
[3] Исходные коды ядра Linux (2.6.25-33), как всегда.

Saturday, February 13, 2010

Когда память работает на Вас: использование ramfs и tmpfs

Если производительность операций дискового ввода/вывода не соответствует Вашим нуждам, то наиболее дешёвое и наименее затратное по времени решение - это разместить очень часто используемые файлы в оперативной памяти (RAM). Чтение и запись в память значительно быстрее, чем соответствующие операции на дисковой файловой системе. Операции дискового ввода/вывода, как те, что используются при работе с базами данных, позволяют улучшить производительность, если перенести данные в файловую систему, находящуюся в оперативной памяти.

Почему именно оперативная память? Оперативная память быстра. Она работает на скоростях доступа порядка наносекунд. Оперативная память (RAM) не вращается. Дисковые накопители содержат механизмы, которые находятся в движении. Это означает, что операции чтения и записи, требующие поиска, значительно медленнее нежели работа с RAM. Для примера, память DDR3 позволяет перемещать данные со скоростью выше 10ГБ/с. Даже самый быстрый жёсткий диск фирмы Hitachi - UltraStar, работающий на скорости 15000 об/м работает с данными в среднем на скоростях от 119МБ/с до 198МБ/с, а пиковая скорость - максимум 600МБ/с. RAM имеет большее среднее время безотказной работы. Так как RAM не содержит механических частей, то её показатель наработки на отказ гораздо лучше, чем у дисковых накопителей.

В качестве файловых систем, работающих в RAM, в Вашем распоряжении есть tmpfs и ramfs. Давайте посмотрим, как можно настроить файловую систему в RAM и попутно избежать некоторых общих проблем при использовании RAM-ФС.

ramfs

tmpfs и ramfs работают по-разному. ramfs может использовать только системную память, данные о ней не отображаются по команде df -h, она также не имеет ограничений по размеру и не выдаёт сообщений об ошибках при установке опциональных ограничений (т.е., ограничение размера файловой системы, задаваемое при монтировании, как показано ниже - перев.). То обстоятельство, что ramfs не имеет ограничений по размеру и Вы никогда не увидите предупреждающих сообщений при установке опциональных ограничений может показаться противоречивым, но на самом деле это не так. Вы можете ограничить размер файловой системы ramfs, но Вы не получите предупреждения при превышении этого ограничения, равно как сама система не будет делать ничего, чтобы предотвратить превышение ограничения.

Для примера синтаксис команды монтирования новой файловой системы таков:

# mount -t fs_type device mount_dir

Синтаксис команды mount для настройки RAM-ФС размером в 200МБ для БД в каталоге /opt/data будет таким:

# mount -t ramfs -o size=200m ramfs /opt/data

Как говорилось ранее, эта ФС не будет отображена по команде df -h. Единственный способ обнаружить её - команда mount.

# mount

/dev/mapper/VolGroup00-LogVol00 on / type ext3 (rw)
proc on /proc type proc (rw)
sysfs on /sys type sysfs (rw)
devpts on /dev/pts type devpts (rw,gid=5,mode=620)
/dev/sda1 on /boot type ext3 (rw)
none on /proc/sys/fs/binfmt_misc type binfmt_misc (rw)
sunrpc on /var/lib/nfs/rpc_pipefs type rpc_pipefs (rw)
ramfs on /opt/data type ramfs (rw,size=200m)

Если Вы попытаетесь записать на подмонтированную таким образом ФС более 200МБ данных, запись продолжится без каких-либо предупреждений со стороны системы. Предел в 200МБ является избыточным и никак не влияет на реальный размер ФС или на то, сколько данных Вы можете на неё записать. Это является главным недостатком ramfs.

Системные администраторы слишком занятые работой, могут захламлять ФС различными файлами - после установки патчей, ПО, или после продолжительного тестирования системы. Для преодоления этой проблемы ramfs является эффективным решением. Все файлы, в длительном хранении которых на дисковом накопителе нет необходимости, могут размещаться на временной ФС в памяти (ramfs). После размонтирония всё содержимое RAM-ФС утрачивается, а все ненужные файлы т.о. разом исчезают из системы, не засоряя её.

Небольшое замечание по поводу ramfs: хоть формально при монтировании ramfs Вы можете указать размер файловой системы, Вам необходимо следить за её заполненностью. И вот почему: размер ramfs увеличивается динамически и система не предотвратит переполнение ФС, как было сказано. Допустим, у Вас 2Гб оперативной памяти, у Вас на /tmp/ram подмонтирована ramfs на 1Гб. Когда данные на Вашей ramfs перевалят за 1Гб, Вы всё ещё сможете дописывать что-то новое в /tmp/ram. Система будет молча выполнять всё, что Вы ей скажете. Однако, когда объём данных превысит объём физической памяти (2Гб в нашем случае), система может зависнуть, т.к. в RAM больше физически нет места для хранения данных. Имейте это ввиду и считайте, что Вас предупредили.

tmpfs

Для операций с реальными данными наиболее предпочтительным выбором является tmpfs, нежели ramfs. В отличие от последней, tmpfs имеет фиксированный размер, данные, хранимые на ней, могут размещаться как в системной памяти, так и в разделе подкачки. При превышении размера ФС система выдаёт сообщения об ошибке превышения размера ФС. Синтаксис монтирования tmpfs похож на уже виденный нами:

# mount -t tmpfs -o size=200mb tmpfs /opt/data

df -h показывает сведения о подмонтированной tmpfs так же, как о любой другой реальной ФС:

# df -h

Filesystem            Size  Used Avail Use% Mounted on
/dev/mapper/VolGroup00-LogVol00
                      360G  225G  117G  66% /
/dev/sda1              99M   25M   70M  27% /boot
tmpfs                 200M     0  200M   0% /opt/data

mount показывает следующую информацию:

# mount

/dev/mapper/VolGroup00-LogVol00 on / type ext3 (rw)
proc on /proc type proc (rw)
sysfs on /sys type sysfs (rw)
devpts on /dev/pts type devpts (rw,gid=5,mode=620)
/dev/sda1 on /boot type ext3 (rw)
none on /proc/sys/fs/binfmt_misc type binfmt_misc (rw)
sunrpc on /var/lib/nfs/rpc_pipefs type rpc_pipefs (rw)
tmpfs on /opt/data type tmpfs (rw,size=200m)

Когда объём записываемых на ФС данных превышает размер самой tmpfs, система выдаёт сообщение "No space left on device", предупреждая Вас, что ФС заполнена. tmpfs во всём схожа с обычными дисковыми ФС, за исключением того, что она непостоянна. Вы можете добавить в /etc/fstab соответствующую строку, чтобы tmpfs монтировалась после каждой перезагрузки.

Следует иметь ввиду, что ramfs и tmpfs - непостоянные ФС, которые находятся в памяти. Это значит, что при крахе системы, перезагрузке или останове - не важно по какой причине, все данные, хранящиеся на RAM-ФС, будут безвозвратно утеряны. Поэтому используя tmpfs Вам следует позаботиться о том, чтобы периодически делать дампы данных с tmpfs на постоянный носитель. Помните пословицу: "Безопасный, быстрый, дешёвый; выбирай любые два". Использование RAM-ФС - это быстрое и недорогое решение, но небезопасное.

Оригинал статьи на английском здесь.

ПОСЕТИТЕЛИ

free counters