NP2 Асинхронное программирование

Лекция: Асинхронное программирование в Python

Многие современные приложения тредуют работы в многопоточном и асинхронном режиме. В этой лекции мы познакомимся с тем, зачем как проектировать и писать программмы, выполняющиеся в несколько потоков, какие средства есть для этого в языке Python.

Презентация

Смотреть слайды к этому занятию.

Основы многозадачного программирования

Что такое блокировка потока?

Thread blocking Источник: NGINX.

Любая компьютерная программа - это последовательность инструкций, выполняемая на центральном процессоре. Инструкции идут одна за одной и выполняются последовательно на конвейере. Однако, довольно часто при работе программ центральному процессору приходится обращаться к внешним устройствам - оперативной памяти, жесткому диску, устройствам ввода-вывода. Все они работают очень медленно с точки зрения процессора.

Более того, выполнение таких операций может потребовать неопределенно долгого времени. Например, если в программе встречается инструкция чтения с клавиатуры, программа должна ждать ввода пользователя. Причем, она не может продолжиться до тех пор, пока эта инструкция не будет выполнена и не будет возвращено значение. Ведь инструкции в программе идут последовательно и выполнение следующей инструкции может зависеть от результата предыдущей. Такая ситуация называется блокировкой потока. То есть, остановка потока выполнения инструкций программы.

В операционных системах для этого существует специальный механизм - системные вызовы. Как мы знаем, программа не может напрямую обращаться к внешним устройствам, она должна вызвать соответствующую функцию операционной системы. В такой момент управление переходит к ней и программа должна ожидать окончания выполнения системного вызова.

Таким образом, при выполнении некоторых действий выполнение программы блокируется. То есть она будет ждать какое-то, заранее неопределенное время. Такие операции называются блокирующими. Примерами блокирующих операций являются:

  • обращение к внешним устройствам через механизм системных вызовов;
  • файловый и консольный ввод-вывод;
  • выполнение сетевых запросов;
  • явные инструкции ожидания (например, time.sleep в питоне);

Неблокирующими операциями являются инструкции, которые сразу же выполняются на центральном процессоре. К ним относятся математические операции, операции со строками, массивами, и другие простые манипуляции. Во время выполнения таких инструкций программа полностью загружает центральный процессор (или как минимум, одно его ядро).

На практике, большинство прикладных программ большую часть времени своего выполнения находятся именно в таком состоянии ожидания. Это приводит к нерациональному расходованию ресурсов. И именно для решения этой проблемы в операционных системах был придуман механизм многозадачности. Операционная система сама отслеживает состояние процессов и может переключить выполнение на другой, незаблокированный процесс пока этот ждет наступления внешнего события.

{% capture notice-2 %} Выводы:

  1. Программы в своей работе часто обращаются к медленным внешним устройствам.
  2. Блокирующими называются операции, которые требуют неопределенного времени.
  3. Классический пример - операция чтения с консоли.
  4. При выполнении блокирующей операции выполнение программы приостанавливается.
  5. Выполнение не может быть продолжено до наступления какого-то внешнего события.
  6. Управление передается операционной системе посредством системного вызова.
  7. Большинство программ большую часть времени просто ждут. {% endcapture %}
{{ notice-2 | markdownify }}

Откуда берется прирост скорости?

В зависимости от того, насколько часто та или иная программа (а точнее ее алгоритм) выполняет блокирующие операции, все алгоритмы можно разделить на две условные категории:

CPU and IO bound tasks Источник: Maliszewski@ResearchGate.

  • задачи, ограниченные процессором (cpu-bound tasks) - это программы, большую часть которых составляют вычислительные инструкции. Такие программы редко блокируются и сильно нагружают центральный процессор. Скорость их выполнения напрямую зависит от скорости работы процессора.
  • задачи, ограниченные вводом-выводом (io-bound tasks) - это такие программы, которые часто выполняют блокирующие операции, чаще всего - операции ввода-вывода. Такие программы наоборот, большую часть времени находятся в ожидании, и скорость их выполнения зависит от внешних факторов.

Надо подчеркнуть, что это характеристика самой задачи, то есть алгоритма, а не способа его решения. Если задача связана с вводом-выводом, ее нельзя сделать чисто вычислительной, и наоборот. Хотя, конечно, можно немного уменьшить или увеличить количество блокирующих операций, специальным образом спроектировав алгоритм, но в целом, именно сама решаемая задача будет определять, к какому из этих классов будет относиться программа.

Еще заметим, что данное деление, конечно, условное, и нет четкой границы между этими двумя классами. Это скорее непрерывный спектр, а эти классы - его полюса. Мы описываем его только потому, что разработчику программ полезно понимать, где в этом спектре находится его программа, так как от этого зависит, как ее выполнение можно ускорить. Ведь подходы к оптимизации таких программ существенно различаются.

Выполнение задач, ограниченных процессором, можно ускорить двумя способами. Во-первых, можно увеличить скорость процессора. Это очевидный, но не всегда возможный вариант. Во-вторых, некоторые такие задачи можно разделить на относительно независимые блоки, подзадачи, некоторые из которых можно выполнять одновременно. Такое возможно, если выполнение одного блока никак не использует результат выполнения других блоков. В таком случае, их можно выполнять одновременно, например, на разных ядрах одного процессора (или на разных процессорах, или даже машинах в кластере) и за счет этого получить прирост скорости. Такой подход называется параллелизмом или параллельным программированием.

Задачи, ограниченные вводом-выводом, сложно ускорить, используя более быстрый процессор. Это, конечно, не повредит, но такие алгоритмы только малую долю времени выполняются на нем. Здесь опять же поможет разбиение программы на независимые подзадачи. Но их уже не обязательно выполнять одновременно. Можно переключать подзадачи по мере блокировок, примерно так же как поступает операционная система с процессами. Такой подход называется асинхронное программирование. Еще вы можете встретить термин “оптимизация блокировок”.

Concurrency vs parallelism Источник: TechRocks.ru.

Выполнение программы в таком случае, перестает быть строго последовательным. То есть мы не можем быть уверены в том, что инструкции программы выполнятся именно в той последовательности, которую мы задали при написании программы. Это немного похоже на создание программ в процедурном стиле, когда вместо написания всех инструкций в линейной последовательности, вы создаете обособленные функции, а их основного кода можете вызывать их в произвольном порядке.

Chess simul Источник: ChessBase.

Еще это похоже на сеанс одновременной игры в шахматы. Гроссмейстер в этой аналогии - как процессор. Он думает быстро и его время очень ценно. А игроки - это задачи, которые часто “блокируют” его, то есть долго думают над ходом. Если бы гроссмейстер играл с каждым игроком последовательно, весь сеанс занял бы очень много времени. Поэтому на практике он не ждет окончания хода игрока, а перемещается к следующему и так далее по кругу. За исключением технических деталей, именно так и работают асинхронные программы.

Надо сказать, что в информатике существует путаница в терминах, связанных с этой темой. Не все четко понимают разницу между асинхронным и параллельным программированием. Тем более, что подзадачи тоже могут выполняться на разных ядрах. Поэтому не удивляйтесь, если в других источниках увидите другие определения асинхронности и параллелизма. Но здесь я буду использовать эти два термина для обозначения двух разных подходов к ускорению работы программ. Для общего названия практики программирования, заключающейся в разбиении алгоритма на независимые подзадачи я выбрал термин “Многозадачное программирование”. Многозадачное программирование, таким образом, включает в себя и асинхронность и параллелизм.

{% capture notice-2 %} Выводы:

  1. Все алгоритмы можно поделить на две условные категории.
  2. Задачи, ограниченные процессором - вычислительные, постоянно выполняются.
  3. Задачи, ограниченные вводом-выводом - часто ждут внешних событий
  4. Первую категорию можно ускорить только за счет параллелизма.
  5. Вторую - за счет асинхронности и, в меньшей степени, за счет параллелизма.
  6. В момент, когда одна задача блокируется, процессор может переключиться на другую.
  7. Асинхронное программирование очень похоже на сеанс одновременной игры.
  8. В терминах существует путаница. Особенно в русскоязычных. {% endcapture %}
{{ notice-2 | markdownify }}

Каковы основные недостатки многозадачного программирования?

Многозадачное программирование помогает решить насущную проблему - оптимизацию работы программ. И поэтому оно широко используется при создании сложных информационных продуктов. В первую очередь в этом подходе нуждаются сетевые приложения, так как обмен данными по сети - это блокирующие операции. Тем не менее, надо понимать, что применение многозадачного программирования порождает определенные сложности.

Мы уже упоминали, что некоторые задачи по самой своей природе являются последовательными. Если выполнение каждой следующей инструкции требует использование результата предыдущей, то мы никак не сможет разделить программу на независимые подзадачи. Это, конечно, редкий случай, но такое может случиться. Большинство задач, все же поддаются некоторой декомпозиции.

Приведем простой пример - алгоритм решения квадратного уравнения через дискриминант. Можно разделить эту задачу на три - вычисление дискриминанта, вычисление первого корня и вычисление второго. Причем, первый этап должен быть реализован в самом начале, так как его результат используется в последующих шагах. А вот вычисления корней являются независимыми задачами, и их можно выполнять в любой последовательности и одновременно. Это, конечно, вычислительно очень простая задача, но хорошо иллюстрирует пример частично распараллеливаемого алгоритма.

Многозадачное программирование тесно связано с реализацией механизма многозадачности. Такой механизм может реализовываться либо самой операционной системой через механизм процессов, либо библиотекой языка программирования. Многозадачность - это механизм псевдоодновременного выполнения нескольких задач (алгоритмов) на компьютере, когда система многозадачности периодически переключает выполнение одного потока команд на другой. Это называется переключение контекста. В любом случае нужно помнить о двух типах многозадачности - кооперативной и вытесняющей. при кооперативной многозадачности переключение на другую задачу происходит при блокировке текущей. При вытесняющей - система сама может переключить контекст в произвольный момент, между двумя любыми инструкциями.

Если ваши задачи являются полностью изолированными, каждая задача “не замечает” переключение контекста, ведь при возврате к выполнению этой задачи она продолжится с того же места без каких-либо побочных эффектов. Проблемы возникают, если задачи взаимосвязаны и используют какие-либо общие ресурсы. Например, общие переменные. Ведь пока задача была приостановлена, значение этой общей переменной могло поменяться, а текущая задача на это не рассчитывала. А на практике большинство задач так или иначе используют общие ресурсы - переменные (то есть общие области в памяти), файлы, внешние устройства, сеть.

Поэтому если какой-то алгоритм работает корректно в обычном, однопоточном режиме, он может работать некорректно в многозадачном виде именно из-за таких общих ресурсов. Это называется потоконебезопасные алгоритмы. При написании многозадачных программ нам нужно следить за тем, как задачи в них взаимодействуют и обеспечивать потокобезопасность. Это дополнительные сложности, с которыми мы сталкиваемся при проектировании и реализации многозадачных программ.

Еще одна большая сложность при реализации многозадачных программ - сложность их отладки. Особенно при использовании вытесняющей многозадачности программист не может предсказать моменты переключения контекста. В результате инструкции выполняются в случайном порядке, что может приводить к ошибкам, которые то появляются, то исчезают. Ловить и исправлять такие ошибки довольно сложно.

Вместе с этим надо сказать, что распараллеливание алгоритма не является панацеей для быстродействия. Ведь переключение контекста - это сам по себе довольно длительный процесс. Конечно, обычно не приходится реализовывать собственный механизм многозадачности - есть уже готовые инструменты, но все равно, на создание задач и переключение между ними тратится время и ресурсы. Существует даже понятие чрезмерного распараллеливания, когда большая часть времени выполнения программы тратится не на выполнение полезной задачи, а на переключение между задачами.

Поэтому прирост скорости при использовании многозадачности никогда не бывает кратным. Даже если вы разбили программу на пять задач и запустили одновременно на разных ядрах процессора, не ожидайте пятикратного ускорения программы. Насколько в реальности может быть ускорена программа зависит от множества факторов, все, что мы можем сказать, что ускорение будет меньше, чем в пять раз.

Несмотря на все сложности, параллельное и асинхронное программирование - очень распространенные техники, которые зачастую помогают решить насущные проблемы разработки сетевых приложений. Часть только с помощью распараллеливания можно реализовать некоторую функциональность, которая в чисто последовательном режиме была бы просто невозможной. Например, создание сервера, который может одновременно обслуживать нескольких клиентов. Так что знание основ многозадачного программирования - необходимый навык для любого разработчика.

{% capture notice-2 %} Выводы:

  1. Некоторые задачи являются в принципе последовательными.
  2. Такие программы сложнее проектировать, реализовывать и отлаживать.
  3. Существует понятие потокобезопасности, которую надо обеспечивать.
  4. Одновременное программирование связано с механизмами многозадачности.
  5. Вытесняющая многозадачность означает непредсказуемое переключение контекста.
  6. Не стоит забывать про накладные расходы - это все не бесплатно.
  7. Прирост скорости даже в идеальном случае не будет кратным. {% endcapture %}
{{ notice-2 | markdownify }}

Чем отличаются процессы и потоки?

Processes and threads Источник: D. Kurtz.

Для обеспечения многозадачности существуют известные механизмы операционной системы - процессы. Вы можете спроектировать программу так, чтобы она состояла из нескольких взаимодействующих процессов. Тогда запуском, переключением и управлением процессами займется операционная система. Здесь надо помнить два факта. Во-первых, каждый процесс является изолированным участком памяти, который имеет свой стек, собственные структуры данных, исполняемый код, и множество скрытых атрибутов. Во-вторых, все современные операционные системы реализуют именно вытесняющую многозадачность. То есть процессы могут быть приостановлены и переключены в произвольные моменты времени, вы как разработчик никак на это не можете повлиять.

Управление процессами - это прерогатива операционной системы. То есть все операции с процессами - создание, переключение, уничтожение - производится исключительно самой системой. Но на предоставляет специальные системные вызовы для того, чтобы сами процессы могли совершать необходимые действия. Например, системный вызов exec порождает новый процесс. А системный вызов exit - уничтожает текущий (этот системный вызов автоматически выполняется при достижении конца программы, фактически - это всегда последняя инструкция в любой программе). Поэтому любой процесс может запросить у операционной системы создание нового процесса. После этого в него можно загрузить определенную программу и запустить его на выполнение. Так как все эти операции осуществляются через механизм системных вызовов, они никак не зависят от языка программирования.

Но надо помнить, что все действия, связанные с переключением процессов, не имеют системных вызовов. Сами процессы никак не могут повлиять на то, когда операционная система может их приостановить. Обычные, немногопоточные программы как правило этого и не замечают, так как они возобновляют работу с того же места, для них этой приостановки как бы и не существовало. Но как только вы начинаете создавать многопоточные или многопроцессные приложения, приходится обращать внимание на этот механизм вытесняющей многозадачности. Более того, создание, переключение и уничтожение процесса - это довольно сложные и длительные операции. Кончено, с человеческой точки зрения это происходит мгновенно, много раз в секунду. Но с точки зрения компьютера это занимает довольно много времени. Поэтому операционные системы не переключают процессы очень часто.

В рамках одного процесса могут быть организованы так называемые потоки - отдельные задачи, каждая из которых имеет собственный алгоритм работы. Потоки похожи на процессы в том, что они выполняются независимо друг от другая, “параллельно”. Потоки так же могут блокироваться, ведь в ходе их выполнения могут быть выполнены блокирующие операции. Таким образом потоки тоже могут находиться либо в заблокированном состоянии, либо в состоянии готовности к выполнения, либо в состоянии выполнения в данный момент. Основная идея процессов состоит в том, что при выполнении блокирующей операции не происходит приостановки всего процесса операционной системы. Блокируется только текущий поток. Если в программе есть другие незаблокированные потоки, то есть потоки в состоянии готовности, то выполнение будет переключено на них. Это позволит не блокировать процесс целиком. Это называется оптимизация блокировок. За счет этого может произойти ускорение работы, так как программа будет меньше времени проводить в ожидании.

Любой процесс имеет как минимум один поток - главный. В процессе своего выполнения программа может создать новый поток и загрузить в него какой-то программный код. Создавать можно столько потоков, сколько необходимо. В отличие от процесса, все потоки выполняются в рамках одного процесса. Поэтому они имеют полный доступ ко всем общим ресурсам процесса - например, к переменным. Вообще, все потоки одного процесса имеют общую память - область памяти их процесса.

Вообще говоря, потоки, как и процессы - концепция операционных систем. Но на практике ни одна распространенная операционная система не поддерживает потоки на уровне ядра. Поэтому сейчас реализацией потоков занимается на ядро операционной системы (как с процессами), а внешние библиотеки. Поэтому существует множество реализаций потоков в разных библиотеках, в разных языках программирования. Мы будем изучать потоки на примере стандартной библиотеки Python, но даже в этом языке программирования существуют другие библиотеки для многопоточности. Их конкретная реализация может различаться в деталях и в механизме организации потоков.

{% capture notice-2 %} Выводы:

  1. Для обеспечения многозадачности в операционной системе используются процессы.
  2. Операции с процессами происходят через системные вызовы и не зависят от языка.
  3. Создание процесса - довольно длительная процедура.
  4. Процессы управляются операционной системой, у программиста нет над ними контроля.
  5. Потоки выполняются в рамках одного процесса и имеют общую память.
  6. Потоки сейчас не поддерживаются на уровне ядра никакими популярными ОС. {% endcapture %}
{{ notice-2 | markdownify }}

Что может пойти не так?

Как мы уже говорили, при разработке обычных, немногозадачных программ не возникает никаких проблем при блокировке и переключении процессов. Так как процессы являются полностью изолированными единицами выполнения программного кода, когда процесс возобновляется, он продолжает свое выполнение в том же состоянии, на котором он остановился. Операционная система сама при переключении процесса переносит в нужные области оперативной памяти всю информацию, необходимую процессу для выполнения.

По-другому все обстоит в работе многозадачных приложений. Дело в том, что несколько потоков выполнения могут работать с общими ресурсами. Рассмотрим такую ситуацию. Программа состоит из двух потоков. Каждый из них работает с одним и тем же файлом - осуществляет чтение и запись в него. Допустим, посередине чтения из этого файла текущий поток прервался из-за вытесняющей многозадачности. Управление переключилось на второй поток программы, который начал запись в этот файл. После этого потоки опять переключились и управление вернулось к первому. С его точки зрения, посередине чтения содержимое файла изменилось. Это может привести к непредсказуемым последствиям, так как нарушает логику работы алгоритма.

Давайте рассмотрим классический пример. Допустим, мы пишем банковское приложение для работы со счетом клиента. И у нас есть функция списания денег со счета. Она принимает один аргумент - количество денег, которые надо списать. Допустим, так же, что счет клиента - дебетовый, то есть на нем не может быть долга. Тогда наша функция должна проверять, есть ли у клиента необходимая сумма. Если да, то списываем (и возвращаем, например, True), а если нет - отказываем. Это может выглядеть примерно так:

1
2
3
4
5
6
7
8
9
account = 100

def acquiring(amount):
	global account
	if account > amount:
		account -= amount
		return True
	else:
		return False

Довольно простая и абсолютно правильная логика. Наша программа будет работать без ошибок - сколько бы раз мы не вызвали функцию, счет в минус не уйдет, там стоит на это проверка.

Но допустим, для ускорения банковского обслуживания мы хотим сделать нашу программу многопоточной - чтобы функция эквайринга платежа выполнялась в новом потоке. Таким образом, можно одновременно обрабатывать несколько платежей, ускоряя работу программы.

Конечно, на практике такая простая функция и так будет работать быстро и не сильно выиграет от многопоточности. Но в реальности эта функция вполне может содержать, например, несколько запросов к удаленной базе данных, которые занимают много времени и блокируют поток. Так что в реальных приложениях есть что оптимизировать многопоточностью.

После введения многопоточности такая программа уже не будет работать гарантированно правильно. Допустим, у нас на счете 50 денег и мы списываем два раза по 30 денег. По идее, второе списание не должно пройти. Но вот, что может случиться. Если одновременно запущено два экземпляра этой функции, то первый поток может прерваться уже после проверки условия (проверка показала истину, так как 50 больше 30), но перед уменьшением баланса счета. В этот момент включается второй экземпляр функции, который тоже выполнит проверку, получит истинный результат, спишет деньги и закончится. А значит, выполнение вернется в первый поток, который продолжится с точно того же места, на котором прервался. То есть уже после проверки условия. И таким образом произойдет двойное списание.

Такая ситуация называется проблемой доступа к общим ресурсам. Общим ресурсом может быть не только файл, но и какая-то область в памяти, например, переменная, потоки ввода-вывода, любые внешние устройства. Очень часто в многопоточных программах общими ресурсами выступают переменные. Если несколько потоков работают одновременно с одними переменными, то они могут изменяться без ведома какого-либо потока.

То же самое происходит при обращении к внешним устройствам, например, к консоли. Вывод может перепутываться между потоками, нарушаться последовательность операций. В общем случае можно сказать, что проблема многопоточных программ состоит в том, что программист не может однозначно определить порядок выполнения операций в программе, состоящей из нескольких потоков.

Программы, которые корректно работают как в однопоточном, так и в многозадачном режиме называются потокобезопасными. Потокобезопасность - это свойство алгоритмов, программ, даже структур данных, которое заключается в способности правильно работать при наличии множества потоков. Обеспечение потокобезопасносности - это обязанность программиста, который разрабатывает многопоточные программы.

Одна из главных трудностей, с которыми встречается программист при написании многопоточных программ это то, что ошибки потокобезопасности очень трудно зафиксировать. В нашем примере со счетом двойное списание может произойти только если сложится очень определенная комбинация условий внешней среды. В какой момент произойдет переключение потока определяет операционная система исходя их множества факторов. Поэтому если мы запустим нашу программу вручную, скорее всего, все выполнится правильно. И тысячу раз все может выполняться правильно, а на тысяча первый - произойдет двойное списание. Поэтому ошибки потокобезопасности очень трудно заметить. Ведь такое двойное списание происходит и в реальности в настоящем банковском программном обеспечении, с его кучей степеней защиты, множеством стандартов и процедур тестирования.

Обеспечение потокобезопасности основывается на том, что ее нарушения возникают при доступе к общим ресурсам. Поэтому если ограничить возможность нескольких потоков одновременно обращаться к одному и тому же ресурсу, то программа станет работать корректно, так как избежит проблемы, описанной выше. Такой механизм называется блокировка ресурсов.

{% capture notice-2 %} Выводы:

  1. Проблемы возникают, когда подзадачи обращаются к общим ресурсам.
  2. Если у потоков есть общая переменная, она может измениться без ведома потока.
  3. Если несколько потоков пишут данные в один файл, вывод может быть перепутан.
  4. То же самое может произойти с выводом в консоль.
  5. Потокобезопасность - это свойство программы работать правильно в конкуррентном режиме.
  6. Программист должен обеспечить потокобезопасность своей программы.
  7. Самый понятный способ обеспечения потокобезопасности - ввести блокировки ресурсов. {% endcapture %}
{{ notice-2 | markdownify }}

Как заблокировать доступ к ресурсу?

Locks Источник: GeeksForGeeks.

Так как большинство проблем при работе многопоточных программ происходит при одновременном доступе нескольких потоков к одному общему ресурсу, самым решением этой проблемы будет запретить доступ нескольких потоков к одному и тому же ресурсу. Для этого существуют специальные объекты - замки, которые помогают заблокировать доступ к ресурсу.

Концептуально, замок работает очень просто. Это некоторый особый объект, у которого есть всего два действия - закрыть и открыть (обычно их называют “получить доступ”, acquire и “освободить”, release). При создании замок находится в состоянии “свободный”. Когда какой-то поток хочет поработать с определенным ресурсом (например, общей переменной), он получает досуп к замку. Получить доступ можно только к свободному замку. В этот момент замок переходит в состояние “закрытый”. Если в этот момент другой поток тоже захочет получить доступ, он не сможет, так как замок находится в состоянии “закрыт”. В этот момент второй поток блокируется и начинает ждать, когда поток, закрывший замок его освободит. Таким образом, два потока не могут работать одновременно с одним и тем же ресурсом. Поток, который первый закрыл замок заставит остальные потоки, даже если он сам прервется и передаст им управление, ждать, пока он не закончит работу с ресурсом и не высвободит замок.

Замки - это предохранительный механизм, с помощью которого программист может обеспечить потокобезопасность своих программ. Для того, чтобы эта схема сработала, разработчик должен найти все места в своей многопоточной программе, в которых идет работа с общим для нескольких потоков ресурсом, и “закрыть их на замок” - поставить перед началом работы получение доступа к замку, а в конце - освобождение замка. Не нужно думать, что использование замка в одном потоке обезопасит общий ресурс независимо от того, что происходит в других. Нет, если другие потоки не проверяют замок перед началом работы с ресурсом, то программа все разно будет потоконебезопасной.

Нужно отметить, что операции получения доступа и освобождения замка являются атомарными - потоки выполнения не могут прерваться посередине проверки. Именно поэтому, кстати, не получится обойтись “самодельными” замками. Ведь можно подумать, что такую проверку на занятость ресурса можно провести и просто условием, например так:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
locked = False

# thread 1
if not locked:
	global locked
	if not locked:
		lock = True
		do_something(resource)
		lock = False

# thread 2
if not locked:
	global locked
	if not locked:
		lock = True
		do_something_else(resource)
		lock = False

Этот псевдокод в принципе полностью повторяет логику работы замка. Однако, так все равно делать не нужно. Ведь мы говорим о вытесняющей многозадачности - то есть операционная система может прервать выполнение потока в произвольном месте. И вполне может случиться так, что выполнение прервется после проверки условия, но перед “закрытием” замка. Тогда второй поток подумает, что замок свободен, и также начнет работу с ресурсом. Если в этот момент управление вернется первому потоку, он продолжится с того же места, то есть уже после проверки условия, по ветке “then”. То есть тоже начнет работу с ресурсом.

Замки, которые существуют в библиотеках для многопоточного программирования реализованы специальным образом на низком уровне так, что проверка условия и изменение состояния замка - это одна атомарная операция. Так что даже если поток переключится, мы либо войдем в замок, либо нет. Поэтому одновременной работы с ресурсом не происходит.

Замок помогает заблокировать доступ к ресурсу. А что делать, если потоки в программе используют несколько общих ресурсов? Нужно создать по одному замку на каждый общий ресурс. Будь то переменные, сетевые сокеты, вывод в терминал или файлы - вы должны отследить все общие ресурсы - все “места пересечения” потоков вашей программы и работу с каждым ресурсом закрыть замком. Конечно, чем сложнее программа, чем с большим количеством объектов работают потоки, тем сложнее все отследить.

Именно поэтому, кстати, в Python существует так называемая глобальная блокировка интерпретатора. Разработчики языка в какой-то момент решили, что ставить отдельный замок на каждую блокирующую операцию - это слишком сложно и чревато ошибками. И поэтому поставили общую блокировку - одновременно только один поток может выполняться на интерпретаторе. Из-за этого многие считают реализацию многопоточности в Python неполноценной.

Но если вы работаете с несколькими ресурсами в многопоточной программе, вас поджидает еще одна потенциальная проблема - дедлоки, или взаимоблокировка. Давайте представим, что у нас есть два потока, каждый из которых работает с двумя ресурсами - А и В. Первый поток получает доступ к ресурсу А (закрывает соответствующий замок), и начинает с ним работать. Не освобождая А, он еще хочет получить доступ к ресурсу В. Тут допустим, что в это время второй поток уже получил доступ к ресурсу В (закрыл замок ресурса В) и хочет получить доступ к А. Получается неразрешимая ситуация.

Deadlock Источник: Pro-Java.

Первый поток не может продолжится и блокируется потому, что он ждет, когда второй поток освободит ресурс В, а второй поток тоже заблокирован и ждет, когда первый поток освободит ресурс А. Так что ни один поток не может продолжать выполняться. И по сути вся программа намертво блокируется. Выхода их дедлока нет, можно только прервать задачу.

Но избежать такой ситуации при написании многопоточной программы довольно просто. Нужно всего лишь договориться, всегда блокировать ресурсы в строго определенном порядке. Причем это нужно только тогда, когда поток хочет одновременно поработать с двумя и более ресурсами. Хочешь работать в ресурсом А - пожалуйста. Нужен один ресурс В - тоже никаких ограничений. А вот если нужно и то и другое сразу - то всегда блокируем сначала ресурс А, а потом - В. И никогда не наоборот. Тогда дедлок никогда не сможет случиться.

Замки - это самый простой способ блокировки ресурсов. В более сложных случая могут понадобиться более продвинутые способы. Например, более сложной версией замка является семафор - замок со встроенным счетчиком подключений. Он позволяет ограничить количество одновременно работающих с ресурсом потоков. Такое полезно, например, для ограничения нагрузки на какие-либо участки программы. Например, для ограничения количества одновременных подключений к серверу или базе данных.

В заключении надо сказать, что даже самые простые замки сильно помогают обеспечить потокобезопасность программы. Но в замками не следует перебарщивать. Нужно помнить, что ожидание замка - это блокирующая операция, а смысл многопоточности - оптимизация блокировок. Может показаться соблазнительным не разбираться сильно и закрыть замком большую часть многопоточного алгоритма. Но это значит, что он сможет выполняться только в одном потоке за раз. Другими словами, если закрыть замком весь поток, то исчезнет смысл вообще использовать потоки - программа будет выполняться в синхронном режиме, как обычно. Да, он будет полностью потокобезопасно, что что толку? Поэтому замками надо закрывать минимально, только самые критические участки алгоритма программы, в которых идет именно работа с общим ресурсом.

{% capture notice-2 %} Выводы:

  1. В операционных системах существуют специальные объекты - замки.
  2. Замок можно получить или освободить - это атомарные операции, они не могут быть прерваны.
  3. При попытке получить заблокированный замок поток блокируется - ждет его освобождения.
  4. При доступе к замку он блокируется, чтобы другой поток не мог в это время работать с ресурсом.
  5. На каждый ресурс нужно использовать один замок.
  6. Есть опасность попасть в дедлок - ресурсы нужно блокировать в определенном порядке.
  7. Существуют более продвинутые способы блокировки - семафоры и мьютексы.
  8. Чем больше блокировок использовать, тем меньше толку от многопоточности. {% endcapture %}
{{ notice-2 | markdownify }}

Какие инструменты Python надо знать?

Во всех языках программирования существуют специальные механизмы реализации как параллельного, так и асинхронного программирования. Здесь мы рассмотрим возможности языка Python для многозадачного программирования. На примере этого языка мы познакомимся с основными механизмами обеспечения многозадачности, какие они бывают, в чем их особенности и как выбрать нужный механизм для решения конкретной задачи.

Любой язык программирования позволяет получить доступ к механизму системных вызовов операционной системы. Что-то реализуется встроенными конструкциями языка, что-то - средствами стандартной библиотеки. Создание нового процесса и выполнение функции в новом процессе - это одна из стандартных возможностей операционной системы.

В языке Python для работы с многопроцессорностью существует стандартный модуль multiprocessing. Он позволяет выделить функцию или класс в новый процесс. Процессы в данном случае управляются операционной системой и, поэтому, реализуют вытесняющую многозадачность. При работе с процессами следует помнить, что создание процесса - достаточно дорогая процедура. Поэтому если распараллеливать задачу слишком сильно, можно добиться обратного эффекта снижения производительности - большая часть времени выполнения программы будет уходить на создание, уничтожение и переключение процессов.

Особенно явно это в случае с интерпретируемыми языками, как Python. Ведь программа на таких языках не выполняется непосредственно. Программу выполняет интерпретатор. А если программа состоит из нескольких процессов, то в каждый из них должен быть загружен свой экземпляр интерпретатора. А это занимает и время и оперативную память.

Потоки - более легковесная сущность. Но, как мы говорили, потоки не поддерживаются современными операционными системами. Поэтому они реализуются программно в библиотеках. Это значит, что в разных библиотеках и разных языках программирования может существовать собственна реализация многопоточности и как она работает - зависит от конкретной библиотеки. В Python существует стандартный модуль для написания многопоточных программ - threading. Потоки, которые он создает управляются не операционной системой, а этим модулем. Но он так же реализует вытесняющую многозадачность.

У модуля threading в Python есть одна особенность - глобальная блокировка интерпретатора (GIL, global interpreter lock). Для обеспечения потокобезопасности многопоточных программ, написанных и использованием этого модуля, в него встроен один глобальный замок, блокирующий доступ к интерпретатору только для одного потока за раз. Это значит, что в каждый конкретный момент времени может выполняться только один поток. Даже если компьютер имеет несколько вычислительных ядер, GIL не позволит использовать истинную параллельность, то есть одновременное выполнение нескольких потоков на процессоре.

В языке Python есть еще один механизм многозадачного программирования - модуль asyncio. Он использует еще более легковесные сущности, даже легче, чем потоки - так называемые асинхронные корутины. Из-за особенностей реализации, такие корутины практически не занимают дополнительной памяти и не тратят время на создание или удаление. За счет этого, вы можете создать просто огромное количество корутин, то есть распараллелить задачу очень сильно.

Все асинхронные корутины выполняются в одном процессе и даже, формально, в одном потоке. Поэтому они тоже не дадут возможности параллельного выполнения. Но в отличие от threading этот модуль реализует кооперативную многозадачность. То есть программист при написании программы сам решает, в какие моменты одна задача может переключится на другую. Это очень облегчает проектирование и отладку асинхронных программ. Проблема потокобезопасности практически перестает существовать.

Однако, не обходится и без недостатка. Из-за особенностей внутреннего устройства этого модуля, в нем нельзя использовать стандартные блокирующие операции. То есть, если вы в асинхронной программе используете, например, оператор input() или sock.recv(), то заблокируется весь поток, со всеми корутинами. В модуле asyncio реализованы собственные, неблокирующие версии всех популярных блокирующих операторов и методов. И если вы пишите программу заново, это не проблема. Однако это сильно затрудняет модификацию уже существующего кода, чтобы сделать его асинхронным.

  Процессы Потоки Async
Оптимизация блокировок вытесняющая вытесняющая кооперативная
Использование нескольких ядер да нет нет
Масштабируемость низкая (десятки) средняя (сотни) высокая (тысячи+)
Использование стандартных блокирующих операций да да нет
GIL нет да нет

Сравнение самых распространенных методов многозадачного программирования в Python можно видеть в таблице. Далее мы подробно поговорим про каждый из них, и будем приводить примеры кода. Сейчас же надо сказать, что не существует идеального решения, которое было бы лучше во всех условиях. Поэтому для решения конкретной задачи надо уметь выбирать нужный механизм многозадачности.

Если вам нужно ускорить задачу, ограниченную процессором, то это можно сделать только за счет параллелизма, то есть задействовав несколько вычислительных ядер или процессоров. Это умеет делать только модуль multiprocessing. Но стоит помнить, что нет смысла создавать процессов больше, чем в системе вычислительных ядер. Если вам нужно распараллелить задачу больше, нужно выбирать либо threading, либо asyncio.

Модуль threading - это классическая реализация многопоточности, она средняя по всем параметрам и подойдет в большинстве случаев. Особенно если вы хотите сделать многопоточным уже существующий код и не хотите его полностью переписывать. Если же вам нужно создать тысячи или десятки тысяч задач, то стоит обратить внимание на модуль asyncio. Надо отметить, что судя по развитию новых версий, именно этот модуль становится основным в Python, и именно на него делают ставку авторы и разработчики языка.

{% capture notice-2 %} Выводы:

  1. Все языки программирования позволяют создавать процессы.
  2. Почти у всех языков программирования есть средства многопоточного программирования.
  3. В питоне три стандартные библиотеки дают возможность параллельного программирования.
  4. Модуль multiprocessing позволяет писать многопроцессные программы.
  5. Модуль threading позволяет писать многопоточные программы.
  6. Модуль asyncio позволяет создавать асинхронные задачи с кооперативным переключением.
  7. У каждого подхода свои недостатки и своя ниша. {% endcapture %}
{{ notice-2 | markdownify }}

Многопоточное программирование на Python

Как выделить функцию в поток?

Давайте начнем рассмотрение многопоточного программирования с модуля threading. Он предназначен для выполнения определенного участка кода в отдельном потоке. В поток выполнения необходимо загрузить какую-то последовательность команд. Проще всего для этих целей использовать функцию. Давайте рассмотрим простой пример создания потоков:

1
2
3
4
5
6
7
8
9
import threading 

def proc():
    print("Процесс")
 
p1 = threading.Thread(target=proc, name="t1")
p2 = threading.Thread(target=proc, name="t2")
p1.start()
p2.start()

В этой программе мы импортировали модуль, затем объявили функцию. Эта функция будет выполняться в новом потоке. Обратите внимание, что мы ни разу не вызываем эту функцию явно - тогда она выполнится по месту, в том же потоке, как обычно. Вместо этого, мы передаем эту функцию как параметр при создании объекта Thread. Этот объект будет хранить информацию о потоке и через него можно к этому потоку обращаться. Например, запустить его на выполнение. Еще одним параметром является имя. Задавать его необязательно, но через него удобно дифференцировать потоки.

После создания потока его можно запустить на выполнение, вызвав метод start(). Без этого, код потока не выполнится. Именно после команды start() произойдет выполнение кода, который был написан в целевой функции потока.

В любой программе обязательно есть один поток выполнения - главный. Он содержит основной код программы. Именно он выполняется по умолчанию. В момент запуска нового потока на выполнение программа как бы “раздваивается” - появляется новый поток, но вместе с ним продолжает существовать и основной. Выполнение потока является неблокирующей операцией. Рассмотри такой пример:

1
2
3
4
5
6
7
8
9
10
import threading 
import time 

def proc():
    time.sleep(5)
    print("Second thread")
 
threading.Thread(target=proc).start()

print("Main thread")

В этой программе выполнение второго потока занимает 5 секунд. Но когда мы запустим эту программу, мы увидим, что надпись “Main thread” появилась сразу же. Это потому, что программа не ждет окончания выполнения нового потока, как было бы в случае с обычной функцией, а сразу продолжает выполнение следующих инструкций. То есть основной поток продолжается несмотря на вызов второго потока.

Это тем более верно, так как метод time.sleep() является блокирующим. Поэтому даже если после выполнения сразу начал исполняться именно второй поток, после инструкции sleep() он заблокировался и управление перешло к первому (основному). Поэтому такая программа всегда покажет сначала “Main thread”, а потом, через 5 секунд - print(“Second thread”).

Это и есть оптимизация блокировок. Рассмотрим еще более явный пример:

1
2
3
4
5
6
7
8
9
10
11
import threading
import time 

def proc():
    time.sleep(5)
    print("Second thread")
 
threading.Thread(target=proc).start()
threading.Thread(target=proc).start()

print("Main thread")

В этой программе мы два раза запускаем эту же функцию, каждый раз в новом потоке. Если бы мы использовали обычный, однопоточный подход, то выполнение этой программы заняло бы 10 секунд. Многопоточная программа выполняется за 5 секунд - это максимальное время выполнение одного потока. Это происходит за счет того, что программа ждем 5 секунд два раза не последовательно, один за другим, а как бы одновременно.

Давайте рассмотрим еще один пример. В нем мы создаем два потока, и в каждом выводим имя текущего потока.

1
2
3
4
5
6
7
8
9
10
11
12
13
import threading
import time

def proc():
    time.sleep(1)
    print(threading.current_thread().name)

p1 = threading.Thread(target=proc, name='thread 1')
p2 = threading.Thread(target=proc, name='thread 2')
p1.start()
p2.start()

print("End")

Обратите внимание, как в threading можно получить доступ к объекту текущего потока - вызвав threading.current_thread(). В этом модуле вообще довольно много полезный функций, поэтому очень рекомендуем обратиться к документации к нему для вашей версии Python.

При выполнении данной программы вы чаще всего будете видеть такой вывод в консоли:

1
2
3
End                                        
thread 2                                       
thread 1

но иногда можно увидеть и так:

1
2
3
End                                           
thread 1                                   
thread 2 

То есть не обязательно первый поток выполнится первым. Даже при условии, что они полностью идентичны по содержанию. Всегда помните, что теперь, в мире многопоточного программирования, нельзя определенно сказать, в каком порядке будут выполняться инструкции вашей программы. Это зависит от большого числа факторов, в том числе внешних по отношению к алгоритмы программы. Поэтому рассчитывать на определенный порядок выполнения потоков лучше не стоит. В этом большая гибкость, но и главная сложность многозадачного программирования.

{% capture notice-2 %} Выводы:

  1. Для создания потока нужен обособленный участок кода - проще всего использовать функцию.
  2. Создание и запуск потока - разные операции, они не обязательно идут последовательно.
  3. При запуске потока программа “раздваивается” - появляются новый и основной поток.
  4. Основной поток продолжает выполняться и после запуска нового.
  5. В каком порядке будут выполнены инструкции нельзя предсказать. {% endcapture %}
{{ notice-2 | markdownify }}

Как передать параметры в поток?

Очень часто при проектировании многопоточных программ в отдельных потоках выполняются однотипные задачи. Редко приходится отдельно описывать код каждого потока индивидуально. Обычно мы пишем одну функцию, которая будет выполняться в отдельном потоке много раз, чтобы распараллелить алгоритм. Конечно, практически никогда не нужно много раз выполнять полностью идентичный код. Чаще всего он отличается какими-то небольшими параметрами. Для этого мы используем аргументы функций.

Давайте рассмотрим, как в модуле threading можно передать в функцию параметр. Как вы знаете, в многопоточной программе фукнция-поток не вызывается явно, поэтому существует отдельный синтаксис задания параметров функции при создании потока. Вот пример:

1
2
3
4
5
6
7
8
9
import threading
 
def proc(n):
    print "Процесс", n
 
p1 = threading.Thread(target=proc, name="t1", kwargs={"n": "1"})
p2 = threading.Thread(target=proc, name="t2", args=[2])
p1.start()
p2.start()

Из данного кода довольно очевидно, что можно передавать аргументы двумя способами. Если вам удобнее передавать аргументы по месту (как позиционные при обычном вызове функции), то вам понадобится параметр args, в котором можно задать массив значений аргументов функции. Конечно, размер массива должен совпадать с количеством обязательных аргументов функции, указанных при ее объявлении.

Но иногда проще задавать аргументы не по месту, а по имени. Например, если у функции много необязательных аргументов и вы хотите передать только некоторые из них. Или вам удобнее передавать аргументы в другому порядке, нежели в каком они объявлялись. Наконец, задание аргументов с явным указанием имени служит для повышения читаемости кода, названия аргументов часто поясняют их смысл и служат самодокументацией.

В таком случае, следует воспользоваться параметром kwargs, в котором передаваемые аргументы указываются в виде словаря. Ключом в словаре служит имя аргумента, как оно указано в объявлении функции, а значением - собственно значение аргумента.

Обычно вместе в передачей аргумента в функцию рассматривают и возвращение из функции. Но здесь не все так просто. Дело в том, что в функциях, работающих в отдельных потоках привычный оператор return бесполезен. Вы, конечно, можете его использовать (например для управления выходом из функции как часто делают), но значение вы так не получите. Опять же, вы не вызываете свою функцию явно, поэтому не сможете написать res = func().

Для того, чтобы получить значение из функции, необходимо воспользоваться другими механизмами. В ряде случаев можно выводить результат работы функции в консоль. Или пользоваться общими переменными, файловым выводом. Более подробно получение результата мы рассмотрим позднее.

{% capture notice-2 %} Выводы:

  1. Для дифференциации потоков в функцию-поток можно передать параметр.
  2. Это очень удобно для распараллеливания параметрических действий.
  3. Обратите внимание, что вернуть значение их потока не так просто.
  4. Для получения результата работы потока используется консоль, общие переменные или файлы. {% endcapture %}
{{ notice-2 | markdownify }}

Как выделить класс в поток?

Если ваш многопоточный алгоритм достаточно сложен, то иногда хочется воспользоваться объектно-ориентированным программированием. Программирование в классах предоставляет более широкие возможности. И в модуле threading есть возможность выделить в поток не просто функцию, а метод класса.

Давайте сразу рассмотрим пример кода, который иллюстрирует использование классов для создания потока:

1
2
3
4
5
6
7
8
9
10
11
12
13
import threading 
 
class T(threading.Thread):
  def __init__(self, n):
    threading.Thread.__init__(self, name="t" + n)
    self.n = n
  def run(self):
    print "Процесс", self.n
 
p1 = T("1")
p2 = T("2")
p1.start()
p2.start()

Итак, вместо функции мы объявляем класс. Этот класс должен быть потомком класса Thread. Как часто бывает при наследовании от библиотечного класса, в конструкторе мы должны вызвать конструктор родительского класса. Это необходимо для реализации необходимой функциональности. Кроме этого в конструкторе мы может инициализировать какие-то необходимые поля этого класса.

Сам код, который будет выполняться в новом потоке должен содержаться в переопределенном методе run() данного класса. Обратите внимание, что они не принимает никаких аргументов (кроме обязательного self). В случае с методом класса это и необязательно, как как всю необходимую информацию можно передать в метод через поля класса.

Кроме этих двух методов в классе могут быть и другие методы или поля. Эти два - обязательные.

Обратите внимание на то, как происходит создание и запуск потока. Для создания нам нужно инстанцировать этот класс. Это, конечно, можно сделать много раз, столько, сколько нужно. Так как мы получили объект того же класса threading.Thread, как и классы, с которыми мы работали раньше, у него уже реализованы все методы для работы с потоками. Например, метод start(), с помощью которого происходит запуск потока.

{% capture notice-2 %} Выводы:

  1. Часто функции недостаточно и в потоке можно запустить класс.
  2. При этом выполняться будет именно метод run().
  3. Это требуется, если в функцию придется передавать очень много значений и удобнее хранить их в полях класса. {% endcapture %}
{{ notice-2 | markdownify }}

Как завершить выполнение потока?

При выполнении многопоточной программы почти всегда возникает необходимость воспользоваться результатами работы созданных потоков. Мы уже обсуждали, что просто так из функции нельзя вернуть значение. Для того, чтобы второй поток смог передать в основной результат своего выполнения проще всего использовать общую переменную. Давайте рассмотрим простой пример:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import threading
import time

res = "Initial value"

def proc():
    time.sleep(1)
    global res
    res = "The thread result"

p = threading.Thread(target=proc)
p.start()

print(res)

В этой программе мы создали переменную, в которую будем записывать результат работы потока. При старте программы поместим туда какое-то начально значение. В функции-потоке мы выполняем действия, обращаемся к переменной как к глобальной и записываем в нее результат. После этого в основной программе создаем и запускаем поток, а потом выводим результат на экран.

При запуске этой программы мы сталкиваемся с проблемой. Мы же помним, что при запуске нового потока программа раздваивается и основной поток программы продолжает исполняться. Таким образом инструкция вывода исполнится сразу же. К этому времени значение переменной не успеет измениться, так как поток еще не закончил свою работу.

Нам нужен какой-то способ выполнить определенные действия после окончания работы другого потока. Такая задача называется синхронизацией потоков. Существуют специальные механизмы, которые позволяют выполнять действия в одном потоке по событиям или условиям в другом потоке.

Самым простым механизмом синхронизации потоков является присоединение потока. У объекта Thread есть метод join(). Он работает интересным образом. Он блокирует текущий поток (тот, в котором написан join) до момент окончания того потока, у которого он вызван. Таким образом этот метод позволяет как бы “подождать” окончания другого потока. Давайте воспользуемся этим методом для того, чтобы вывести результат работы уже после того, как он вычислется:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import threading
import time

res = "Initial value"

def proc():
    time.sleep(1)
    global res
    res = "The thread result"

p = threading.Thread(target=proc)
p.start()

p.join()
print(res)

Мы добавили всего одну строчку, но программа стала работать совершенно по другому. В момент выполнения join основной поток приостанавливается и начинает ждать окончания нового. Таким образом, следующие инструкции будут выполнены уже после окончания вычислений в новом потоке. Конечно, не обязательно ставить join сразу же после start. Вы можете после запуска потока еще что-то сделать в основном, а когда вам понадобится результат - подождать его.

В примере с одним новым потоком программа начинает выполняться строго последовательно. Она запускает поток, ждет его завершения и затем продолжается. Так какой же смысл вообще использовать многопоточность? Разве не тот же самый результат будет если просто вызвать функцию на выполнение? Если мы создаем один новый поток то да, никакого выигрыша мы не получаем. Но зачастую нам нужно создать множество потоков.

Давайте в нашем примере создадим не один поток, а несколько. Это проще всего сделать в цикле:

1
2
3
4
for i in range(5):
    p = threading.Thread(target=proc)
	p.start()
	p.join()

Внимательно разберитесь, как работает эта программа. На каждой итерации цикла мы создаем поток, запускаем его, ждем завершения и переходим на следующую итерацию. Так наша программа тоже будет работать полностью синхронно - мы не продолжаем, пока не закончился очередной поток.

Для того, чтобы воспользоваться преимуществами многопоточности и выполнять потоки параллельно, нам нужно сначала их все создать, а затем их все присоединить. Но так как нам придется обращаться к каждому потоку, нам следует добавить все потоки в массив:

1
2
3
threads = []
for i in range(5):
    threads.append(threading.Thread(target=proc))

Запускать потоки можно и сразу после создания, но можно и отдельно. Причем Python позволяет это сделать очень лаконично и красиво используя генераторное выражение:

1
[p.start() for p in threads]

После запуска мы можем так же и присоединить все потоки:

1
[p.join() for p in threads]

Давайте объединим все в полную программу:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import threading
import time

def proc():
    time.sleep(1)


threads = []
for i in range(5):
    threads.append(threading.Thread(target=proc))

[p.start() for p in threads]

[p.join() for p in threads]

Эта программа представляет собой довольно распространенный шаблон многопоточной программы - в основном потоке мы создаем несколько дочерних, выполняем их параллельно, затем присоединяем их все для обработки результата. Вы можете использовать этот шаблон дополнив его необходимым кодом. Но такая схема применяется почти всегда. При использовании модуля threading можно придумать много разных схем взаимодействия потоков, дочерние потоки могут сами создавать новые потоки, можно присоединять потоки не туда, где они были созданы. Но в практической деятельности сложные схемы взаимодействия потоков применяются редко. Рассмотренный шаблон прост в понимании, достаточно универсален и полезен. В будущем мы рассмотрим и другие шаблоны многозадачного программирования.

{% capture notice-2 %} Выводы:

  1. Часто требуется совершить какие-либо действия после окончания работы потока.
  2. Для этого существует операция присоединения потока.
  3. Текущий поток блокируется и ждет, когда будет закончен заданный.
  4. Это часто требуется, если нужно воспользоваться результатом работы потока.
  5. Надо четко понимать, что кого будет ждать, в потоках легко запутаться.
  6. Поток, в котором выполняется присоединение будет ждать тот поток, у которого оно вызвано.
  7. Чаще всего дочерние потоки присоединяются к главному.
  8. Частый шаблон - массовое создание и затем массовое присоединение потока. {% endcapture %}
{{ notice-2 | markdownify }}

Как использовать замки в потоках?

Как мы уже говорили, при выполнении многопоточных программ могут возникнуть условия, когда несколько потоков одновременно работают с одним ресурсом. Такая ситуация называется “race condition” и может приводить к непредсказуемому поведению программы. Для исправления этой проблемы существуют методы блокировки ресурсов, например уже рассмотренные замки.

В модуле threading присутствует специальная реализация замков, объект Lock, применяя который можно обеспечить потокобезопасность программы. Работа с ним достаточно простая. Рассмотрим ее на примере. Допустим, у нас есть программа, которая работает в потоке с общей переменной. Одна функция в потоке ее увеличивает, а другая - уменьшает:

{% capture block %}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import threading 
deposit = 100

def add_profit(): 
    global deposit
    for i in range(100000):
        deposit = deposit + 10

def pay_bill(): 
    global deposit
    for i in range(100000):
        deposit = deposit - 10

thread1 = threading.Thread(target = add_profit)
thread2 = threading.Thread(target = pay_bill)

thread1.start() 
thread2.start()

thread1.join()
thread2.join()

print(deposit)

{% endcapture %}

{{ block | markdownify }}

По идее, так как мы увеличиваем и уменьшаем значение переменной одинаковое количество раз на одинаковое значение, после выполнения программы, ее значение не должно поменяться. То есть программа должна выдать “100”. Попробуйте запустить такую программу самостоятельно. Алгоритмы очень простой и, вроде бы, ничего опасного мы не делаем. Но все равно, иногда программа выдает значение “-246640”, что явно говорит об ошибке в алгоритме. Даже в таких невинных случаях, потоконебезопасный алгоритм может нас подвести.

А этот алгоритм потоконебезопасный потому, что мы работаем с общими ресурсами. В данном случае - общая переменная deposit. Нам нужно закрыть работу с тим ресурсом под блокировку.

Как мы уже говорили, для каждого общего ресурса нужно создать свой замок. В данном случае он у нас один, поэтому и замок нужно создать один. Его нужно создать в главной программе до объявления функций-потоков. В самих потоках мы перед началом работы получаем замок, а после окончания работы с ресурсом - высвобождаем его:

{% capture block %}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import threading 

lock = threading.Lock()
deposit = 100

def add_profit(): 
    global deposit
    for i in range(100000):
        lock.acquire()
        deposit = deposit + 10
        lock.release()

def pay_bill(): 
    global deposit
    for i in range(100000):
        lock.acquire()
        deposit = deposit - 10
        lock.release()

{% endcapture %}

{{ block | markdownify }}

При использовании замков может случиться одна нестандартная ситуация. Если в процессе работы над ресурсом произошла ошибка или исключение, то поток прервется, но замок останется заблокированным. В этом случае другой поток не сможет получить доступ к этому ресурсу. Чтобы исключить такую ситуацию, надо освобождать замок даже в случае аварийного завершения потока:

{% capture block %}

1
2
3
4
5
6
7
8
def add_profit(): 
    global deposit
    for i in range(100000):
        lock.acquire()
        try:
        	deposit = deposit + 10
        finally:
        	lock.release()

{% endcapture %}

{{ block | markdownify }}

Также можно предусмотреть определенные действия в случае, если замок закрыт. По умолчанию, метод lock.acquire() блокирует поток до тех пор, пока замок не освободиться. Но ему можно передать булев параметр - флаг ожидания. По умолчанию он равен True. Если передать в него False, то в случае занятости замка метод просто вернет состояние замка, без блокировки. Использовать это в программе можно, например, так:

{% capture block %}

1
2
3
4
5
6
7
8
9
def add_profit(): 
    global deposit
    for i in range(100000):
		if not lock.acquire(False):
    		print("Resource is busy")
        try:
        	deposit = deposit + 10
        finally:
        	lock.release()

{% endcapture %}

{{ block | markdownify }}

Для упрощения работы, замки в Python можно использовать как контекстные менеджеры. Если вы не знаете, что это - почитайте подробнее. Примерно так же мы работаем с файлами - с помощью инструкции with:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import threading 

lock = threading.Lock()
deposit = 100

def add_profit(): 
    global deposit
    for i in range(100000):
        with lock:
        	deposit = deposit + 10

def pay_bill(): 
    global deposit
    for i in range(100000):
        with lock:
        	deposit = deposit - 10

Паттерн “Множественное создание потоков”

В многопоточном программировании существует много шаблонов, следуя которым вы сможете легко реализовать любой распространенный многопоточный алгоритм. Один из самых простых шаблонов - это распараллеливание действий по группам. Представим, что нам нужно произвести какое-то вычисление много раз, например, 1000. С другой стороны, нецелесообразно создавать 1000 потоков. Ведь 1000 действий можно разделить равными частями на два потока, на 10, на 50, как угодно.

В таких случаях удобно воспользоваться разбивкой действий на группы. Это работает примерно как пагинация при выводе большого количества объектов данных. В большинстве случаев можно передать в функцию, которая занимается обработкой какого-то объекта из множества номер этого объекта. Допустим, у нас 100 объектов которые надо обработать. И мы хотим создать, например, 5 потоков, каждый из которых будет обрабатывать по 20 объектов.

Обычно мы заранее знаем общее количество объектов, и желаемое количество групп (количество создаваемых потоков). Давайте обозначим это переменными:

1
overall, n_group = 100, 5

Из этих параметров легко вычислить, сколько объектов должен обрабатывать один поток. По определенным причинам нам нужно, чтобы эта переменная была целочисленная:

1
amount = int(overall / n_group)

Мы пока не будем задумываться над тем, что будет, если одно число не делится на другое. Тогда надо предусмотреть округление в большую сторону, проверки не превышение максимального количества объектов. Это довольно просто, но отвлечет нас от главного - смысла шаблона. Можете сами реализовать это в качестве упражнения.

Теперь мы будем создавать в цикле потоки. Каждому потоку мы передадим два параметра - начальный номер объект и количество объектов, которые он должен обработать. Начальный номер тоже достаточно легко вычислить:

1
2
3
4
threads = []
for i in range(n_group):
    start = i * amount 
    threads.append(threading.Thread(target=proc, args=[start, amount]))

Теперь нам осталось набросать, как должна выглядеть сама функция-поток. Она должна принимать эти два параметра и в цикле обрабатывать объекты в этих пределах. Примерно так:

1
2
3
4
def proc(start, amount):
    for i in range(start, start+amount):
        time.sleep(1)
        print(i)

Вы можете брать эту программу за основу. В мелких деталях алгоритм может отличаться, но главный смысл - разбивка множества действия на равные группы для выполнения в нескольких потоков - будет полезным во многих практических приложениях. Давайте соберем воедино всю программу:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import threading
import time

def proc(start, amount):
    for i in range(start, start+amount):
        time.sleep(1)
        print(i)

overall, n_group = 100, 5
amount = int(overall / n_group)

threads = []
for i in range(n_group):
    start = i * amount 
    print(start, amount)
    threads.append(threading.Thread(target=proc, args=[start, amount]))

[p.start() for p in threads]

[p.join() for p in threads]

print("End")

Многопроцессное программирование на Python

Зачем использовать multiprocessing?

Многопоточное программирование очень полезно для оптимизации задач, ограниченных вводом-выводом. Однако на вычислительных задачах, ограниченных центральным процессором использование нескольких потоков не даст прироста скорости. В Python это происходит за счет глобальной блокировки интерпретатора. Поэтому модуль threading не даст возможности ускорять работу программ за счет распараллеливания задач на несколько вычислительных ядер. Для этого нам понадобится механизм создания нескольких процессов и распределения вычислительной нагрузки программы между ними. Именно для этого и нужен модуль multiprocessing.

Модуль multiprocessing является стандартным модулем языка Python с версии 2.6. Работа с ним очень похожа на работу с модулем threading. Вы так же выносите в отдельный объект выполнения определенную часть программы - удобнее всего работать в функциями. Но теперь эта часть загружается в новый отдельный процесс, который специально создается для этого операционной системой.

Процессы, в отличие от потоков, создаются, уничтожаются и управляются операционной системой. Все потоки одного процесса имеют общую область в памяти, где хранятся, например, переменные программы. Процессы же - полностью изолированные сущности, которые не имеют никаких общих участков. С одной стороны, это сильно облегчает обеспечение потокобезопасности многопроцессных программы - ведь большую часть работы берет на себя операционная система.

В работе многопроцессных программ на интерпретируемых языках программирования есть существенная особенность. Программы на компилируемых языках существуют в виде машинного кода и выполняются непосредственно на процессоре. Выполнение интерпретируемых программ, скриптов, устроено иначе. При “запуске” программы, которая является всего лишь текстовым файлом, на самом деле запускается интерпретатор, который читает текст скрипта, переводит в машинный код и выполняет его строчка за строчкой. Точно так же приходится поступать, если программа порождает новый процесс. В новый процесс загружается копия интерпретатора Python. Вы можете это увидеть сами в диспетчере задач или в выводе команды top. Вы увидете несколько процессов, обозначенных “python.exe” (на Windows).

Интерпретатор Python - это большая и сложная программа. И создание нового потока на Python занимает довольно много времени и памяти. По меркам компьютера, конечно, для человека все происходит мгновенно. Но именно по этой причине, процессы в Python масштабируются не так хорошо, как потоки. То есть при увеличении часла процессов, которые создаются в программе, увеличиваются накладные расходы на создание и переключение процессов. Поэтому делать очень много процессов обычно не имеет смысла. Как правило, в многопроцессных программах количество процессов, на которые распараллеливается программа, определяется количеством ядер центрального процессора компьютера. Это не жесткое требование, но некоторый ориентир. Конечно, все зависит от конкретной реализации алгоритма и архитектуры программы. Но в любом случае, речь идет о единицах, в крайнем случае - десятках процессов. В то время, как потоки могут создаваться сотнями и тысячами.

При создании процесса кроме интерпретатора в него загружается копия всех ресурсов программы. То есть, из потока вы можете иметь доступ ко всем глобальным переменным, которые были инициализированы до создания нового процесса. Но это именно копии, вы не можете, как в процессах, обмениваться информацией между процессами, изменяя значение общих переменных, ведь у каждого процесса теперь своя копия всех переменных. Это затрудняет обмен данными между процессами, но зато сильно облегчает проектирование программы, потому что меньше приходится думать о потокобезопасности. А в модуле multiprocessing есть специальные инструменты для удобного обмена данными, которых не было в модуле threading.

Но несмотря на все трудности, многопроцессный подход имеет свои ключевые преимущества, и области применения. Как мы уже говорили, только многопроцессность позволяет на Python добится ускорения вычислительных задач. Многопоточность здесь не поможет из-за глобальной блокировки интерпретатора. Но еще многопроцессность применяется для большей изоляции компонентов программы. Например, во многих современных браузерах каждая новая вкладка открывается в новом процессе. Это нужно для того, чтобы, если при отображении одной веб-страницы произошла непредвиденная ошибка, приведшая к зависанию или аварийному завершению процесса, то это не приведет к закрытию браузера в целом. Так как все потоки изолированные, когда “вылетает” один из них, остальные продолжают работать. По этой же причине внешние модули тоже часто выполняются в отдельных процессах, чтобы непроверенный сторонний код не обрушивал всю программу целиком.

{% capture notice-2 %} Выводы:

  1. Модуль multiprocessing был добавлен в Python версии 2.6.
  2. Используя multiprocessing вы можете обойти GIL.
  3. Входит в стандартную библиотеку Python.
  4. Позволяет запускать задачи в разных процессах.
  5. Процессы управляются операционной системой.
  6. Каждый процесс имеет свою копию интерпретатора и всех ресурсов.
  7. Процессы питона - очень тяжеловесные, ведь у каждого свой интерпретатор.
  8. Позволяет получить прирост производительности на многоядерных системах. {% endcapture %}
{{ notice-2 | markdownify }}

Как выделить функцию в процесс?

Основным классом, с которым приходится работать в модуле multiprocessing является класс Process. Он очень похож на класс Thread и позволяет создать несколько процессов, которые вызывают одну и ту же функцию:

{% capture block %}

1
2
3
4
5
6
7
8
from multiprocessing import Process

def worker(name):
    print('Hello from process!')

if __name__ == '__main__':
    p = Process(target=worker)
    p.start()

{% endcapture %}

{{ block | markdownify }}

В данном примере мы объявляем функцию, которая будет выполняться в отдельном процессе. Затем мы создаем объект Process и запускаем его на выполнение точно так же, как и в модуле threading при помощи метода start(). Но в отличии от объекта Thread, при вызове этого метода в объекте Process программа отправляет операционной системе запрос на создание нового процесса. После создания в новый процесс загружается копия интерпретатора Python со всеми ресурсами, в том числе со всеми переменными, созданными в основной программе до порождения потока.

Из-за особенностей реализации процессов, которые мы уже обсуждали, в функции, которая выполняется в процессе нельзя использовать глобальные переменные. Вернее, использовать их можно, но они у каждого процесса будут свои, а не общие, как в случае с потоками. Так же не получится просто так использовать return для возврата результата работы функции.

Обратите внимание, что выделить в процесс можно только верхнеуровневую функцию. потоки не работают с методами или вложенными функциями. Так же обязательно использовать в основном коде программы конструкцию if __name__ == '__main__':. Ведь во все дочерние процессы изначальная программа будет загружаться как модуль. Это особенность реализации процессов в Python. И если вы не будете использовать эту конструкцию, то в новых процессах будет выполняться основной код программы, в том числе инструкции создания новых потоков. Это очень оригинальный способ уйти в бесконечную рекурсию, который чреват зависанием всей операционной системы.

Рассмотрим уже знакомый нам пример - массовое создание процессов. В этом модуле все происходит полностью аналогично threading - работает присоединение процесса, передача параметров в функцию-процесс. Для этого примера возьмем вычислительный пример - проверка числа на простоту:

{% capture block %}

1
2
3
4
5
6
7
def worker(n):
	if n < 2: pass
	i = 2
	while i*i <= n:
		if n % i == 0:
			return
	print(x)

{% endcapture %}

{{ block | markdownify }}

Эта функция, конечно, неоптимизирована, но нам для примера подойдет. Она печатает число только, если это число простое. Функция проверяет, делится ли заданное число на все числа от 2 до корня из заданного. Обратите внимание, как в этой функции используется return - только для управления циклом, вместо break. Вообще-то эта функция ничего не возвращает.

Для больших чисел эта функция будет работать довольно долго. Причем она чисто вычислительная, то есть это задача, ограниченная процессором. Этого нам и нужно. Оптимизировать ее с помощью многопоточности и модуля threading бесполезно. Давайте вызовем ее в нескольких процессах:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from multiprocessing import Process
 
if __name__ == '__main__':
    numbers = [5, 10, 15, 20, 25]
    procs = []
    
    for number in numbers:
        proc = Process(target=worker, args=(number,))
        procs.append(proc)
        proc.start()
    
    for proc in procs:
        proc.join()

    print('Finished.')

Код этой программы уже знаком тем, кто раньше работал с потоками в Python. точно так же происходит передача аргументов в поток. Поэтому в целом данная программа повторяет шаблон “создание потоков на группы задач”.

{% capture notice-2 %} Выводы:

  1. Синтаксис аналогичен созданию потока. По внутреннему устройству они сильно различаются.
  2. В каждый процесс загружается собственная копия интерпретатора и всех переменных.
  3. Поэтому глобальных переменных не будет - у каждого потока свой стек.
  4. Вернуть значение из процесса еще сложнее, ведь они изолированные.
  5. Довольно частый шаблон - использование пула потоков и функции map. {% endcapture %}
{{ notice-2 | markdownify }}

Как получить служебную информацию о процессе?

При работе с процессами иногда бывает необходимо в самой функции получить информацию о процессе, в котором она выполняется. Модуль multiprocessing предоставляет доступ к объекту current_process, через который можно получить информацию о текущем процессе. Например, имя процесса. Имя можно задать явно при создании объекта процесса, либо оно присвоится автоматически.

Еще бывает полезно проверить значение переменной __name__, которая хранит информацию о названии текущего модуля. Но более полезная информация доступна через модуль os. Как известно, этот стандартный модуль Python позволяет получить доступ к некоторым функция операционной системы. В том числе методы получения идентификатора процесса и идентификатора родительского процесса. Рассмотрим пример:

{% capture block %}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import os
from multiprocessing import Process, current_process
 
def worker():
    print('process name:'current_process().name) 
    print('parent process:', os.getppid())
    print('process id:', os.getpid())
 
if __name__ == '__main__':
    numbers = [5, 10, 15, 20, 25]
    procs = [Process(target=worker) for number in numbers]

    [proc.start() for proc in procs]

    [proc.join() for proc in procs]

{% endcapture %}

{{ block | markdownify }}

После запуска эта программа напечатает вывод, похожий на следующий:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
process name: Process-1
parent process: 404571
process id: 404572

process name: Process-2
parent process: 404571
process id: 404573

process name: Process-3
parent process: 404571
process id: 404574

process name: Process-4
parent process: 404571
process id: 404575

process name: Process-5
parent process: 404571
process id: 404576

Обратите внимание, что идентификаторы процессов все разные, но идентификатор родительского процесса один и тот же.

Использование замков в процессах

При использовании процессов гораздо реже возникают ситуации доступа к общим ресурсам, как в многопоточности, ведь процессы - более изолированные сущности. Но все равно такие случаи могут случаться. Например, довольно частый случай - одновременный доступ к консоли, к внешним устройствам, к сети. Все процессы, порожденные в модуле miltiprocessing будут привязаны к одному терминалу и поэтому вывод между ними может путаться.

Для решения этой проблемы в модуле так же присутствуют замки. Их назначение и применение полностью аналогично замкам их модуля threading, которые мы рассматривали ранее:

1
2
3
4
5
6
7
8
9
10
11
12
13
from multiprocessing import Process, Lock

def printer(item, lock):
    with lock:
        print(item)

if __name__ == '__main__':
    lock = Lock()
    items = ['tango', 'foxtrot', 10]
    
    for item in items:
        p = Process(target=printer, args=(item, lock))
        p.start()

Зачем нужен пул процессов?

Как мы говорили ранее, процессы масштабируются гораздо хуже, чем потоки. Довольно часто нам нужно создать какое-то количество процессов, меньшее, чем количество задач. В таком случае, каждый процесс будет обрабатывать несколько задач. Мы уже рассматривали шаблон “Множественное создание потоков”. В модуле multiprocessing все гораздо проще - специально для этого существует объект Pool.

Пул процессов - это набор из определенного количества процессов, которым автоматически передаются на выполнение задачи из списка. Причем количество задач никак не связано с количеством процессов. Количество процессов задается при создании пула так:

1
pool = Pool(processes=3)

После создания, можно использовать этот пул для распараллеливания определенного количества задач. Для этого используется довольно известная функция map. Она принимает на вход два аргумента - функцию и массив, вызывает функцию для каждого элемента массива и возвращает массив результатов:

1
pool.map(doubler, numbers)

Как бонус, пул процессов еще и заботится об обмене данными между дочерними процессами и родительским. Так что использование пула процессов сильно облегчает проектирование многопроцессных программ. Механику работы с пулом процессов можно наглядно увидеть из примера:

1
2
3
4
5
6
7
8
9
from multiprocessing import Pool

def doubler(number):
    return number * 2

if __name__ == '__main__':
    numbers = [5, 10, 20, 25, 30]
    pool = Pool(processes=3)
    print(pool.map(doubler, numbers))

Главной трудностью работы с пулом процессов является то, что он рассчитан на функцию, которая принимает один аргумент. Если функция, которую надо распараллелить, принимает несколько аргументов, существует альтернативный метод - starmap, либо можно просто переписать ее так, чтобы она принимала кортеж.

Использование очереди

1
2
3
4
5
6
7
8
9
10
11
from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # распечатает "[42, None, 'hello']"
    p.join()

Использование конвейера

1
2
3
4
5
6
7
8
9
10
11
12
from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # распечатает "[42, None, 'hello']"
    p.join()

Паттерн “Производитель-потребитель”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from multiprocessing import Process, Queue  

sentinel = -1 

def creator(data, q):
    print('Creating data and putting it on the queue')
    for item in data:
        q.put(item)  

def my_consumer(q):
    while True:
        data = q.get()
        if data is sentinel:
            break 
        print('data found to be processed: {}'.format(data))    
        processed = data * 2
        print(processed)

if __name__ == '__main__':
    q = Queue()
    data = [5, 10, 13, -1]    
    process_one = Process(target=creator, args=(data, q))
    process_two = Process(target=my_consumer, args=(q,))    
    process_one.start()
    process_two.start()    
    q.close()
    q.join_thread()    
    process_one.join()
    process_two.join()




Практика: Многопоточный сервер

В данной работе мы познакомимся с приемами работы с многопоточностью на примере создания сокетного TCP-сервера, способного работать с несколькими клиентами одновременно.

Видео занятия

Методические указания

Читать методический материал к этому занятию.



Практика: Мультипроцессные вычисления

В ходы выполнения этой работы вы научитесь создавать в Python параллельные программы, выполняющие вычисления в несколько процессов.

Видео занятия

Методические указания

Читать методический материал к этому занятию.



Практика: Библиотека asyncio

Здесь мы познакомимся с основными понятиями библиотеки asyncio - основного средства языка Python для реализации асинхронных программ. Осторожно: синтаксис часто меняется!

Методические указания

Читать методический материал к этому занятию.



Разделы:

Дата изменения: