Skip to content

Latest commit

 

History

History
1210 lines (818 loc) · 59.1 KB

File metadata and controls

1210 lines (818 loc) · 59.1 KB

Лекция 31. Celery. Multithreading. GIL. Multiprocessing

Что надо знать до Celery

Процессы

Процесс — экземпляр программы во время выполнения, независимый объект, которому выделены системные ресурсы (например, процессорное время и память). Каждый процесс выполняется в отдельном адресном пространстве: один процесс не может получить доступ к переменным и структурам данных другого. Если процесс хочет получить доступ к чужим ресурсам, необходимо использовать межпроцессное взаимодействие. Это могут быть конвейеры, файлы, каналы связи между компьютерами и многое другое.

Синхронным (synchronous) называется такое взаимодействие между компонентами, при котором клиент, отослав запрос, блокируется и может продолжать работу только после получения ответа от сервера. По этой причине такой вид взаимодействия называют иногда блокирующим (blocking).

В рамках асинхронного (asynchronous) или неблокирующего (non blocking) взаимодействия клиент после отправки запроса серверу может продолжать работу, даже если ответ на запрос еще не пришел. Асинхронное взаимодействие позволяет получить более высокую производительность системы за счет использования времени между отправкой запроса и получением ответа на него для выполнения других задач. Другое важное преимущество асинхронного взаимодействия — меньшая зависимость клиента от сервера, возможность продолжать работу, даже если машина, на которой находится сервер, стала недоступной. Это свойство используется для организации надежной связи между компонентами, даже если и клиент, и сервер не все время находятся в рабочем состоянии.

Что это вообще такое Celery?

Celery – это система для управления очередями задач. Принципиально умеет 2 вещи: брать задачи из очереди и выполнять задачи по расписанию.

Celery - это распределённая очередь задач, реализованная на языке Python.

Celery - это простая, гибкая и надежная распределенная система для обработки огромного количества сообщений, включая в себя инструменты, необходимые для поддержки такой системы.

Это очередь задач с упором на обработку в реальном времени, а также с поддержкой планирования задач.

Celery имеет открытый исходный код и находится под лицензией BSD.

Итак, что же умеет Celery:

  • Выполнять асинхронно задания
  • Выполнять периодические задания (умная замена cron)
  • Выполнять отложенные задания
  • Распределенное выполнение (может быть запущен на N серверах)
  • В пределах одного worker’а возможно конкурентное выполнение нескольких задач (одновременно)
  • Выполнять задание повторно, если вылез exception
  • Ограничивать количество заданий в единицу времени (rate limit для задания или глобально)
  • Несложно мониторить выполнение заданий
  • Выполнять подзадания
  • Присылать отчеты об exception’ах
  • Проверять выполнилось ли задание

Task (Задача)

Задачей является предварительно написанный код (чаще всего функция), предназначенный для выполнения определённой цели (отправка имейла, обработка файла, и т. д.)

Broker (Брокер)

Брокер сообщений (он же диспетчер очереди) — это посредник(транспорт), который принимает и отдает сообщения (задачи) между отдельными модулями/приложениями внутри некоторой сложной системы, где модули/приложения должны общаться между собой — то есть пересылать данные друг другу.

Брокером может выступать как специальное ПО, например, RabbitMQ, так и некоторые NoSQL, например Redis. О них подробнее ниже.

Worker (Воркер)

Воркер - это отдельно запущенный процесс для выполнения определённых задач, Celery запускается на одном или нескольких воркерах, чтобы выполнять задачи параллельно на каждом воркере.

Back-end (Бэкэнд)

В рамках Celery бэкэнд выступает в качестве хранилища результатов выполнения задач. Это может быть как SQL, так и NoSQL база данных. Хотя, по сути что угодно может быть хранилищем, хоть обычный файл (я таких реализаций не встречал, но технически возможно).

  • Producer (поставщик) ‒ программа, отправляющая сообщения. В нашем случае, это чаще всего будет Django.

  • Queue (очередь) ‒ очередь сообщений (задач). Она существует внутри брокера. Любое количество поставщиков может отправлять сообщения в одну очередь, также любое количество подписчиков может получать сообщения из одной очереди. В схемах очередь будет обозначена стеком и подписана именем. Чаще всего за очередь будет отвечать Redis.

  • Consumer (подписчик) ‒ программа, принимающая сообщения. Обычно подписчик находится в состоянии ожидания сообщений. Это будет процесс Celery, который запустили специально для этой цели. Обрабатывает задачи, и складывает результат в backend.

Поставщик, подписчик и брокер не обязаны находиться на одной физической машине.

Брокеры

AMQP

AMQP (Advanced Message Queuing Protocol) — открытый протокол для передачи сообщений между компонентами системы. Основная идея состоит в том, что отдельные подсистемы (или независимые приложения) могут обмениваться произвольным образом сообщениями через AMQP-брокер, который осуществляет маршрутизацию, возможно гарантирует доставку, распределение потоков данных, подписку на нужные типы сообщений.

RabbitMQ

RabbitMQ – это брокер сообщений с открытым исходным кодом. Он маршрутизирует сообщения по всем базовым принципам протокола AMQP, описанным в спецификации. Отправитель передает сообщение брокеру, а тот доставляет его получателю. RabbitMQ реализует и дополняет протокол AMPQ.

Redis

Redis (Remote Dictionary Server) – это быстрое хранилище данных типа «ключ‑значение» в памяти с открытым исходным кодом для использования в качестве базы данных, кэша, брокера сообщений или очереди.

Для Celery крайне рекомендую использовать именно Redis, годами проверенное решение.

Установка необходимого ПО

Celery

Для установки celery мы можем использовать pip:

pip install celery

Celery 4.0+ официально уже не поддерживается для Windows

Варианты запуска

  1. Использовать Linux

  2. Docker

  3. WSL 2 (для Windows 10)

  4. Переменная окружения или прямо в коде

Redis

Установка самого сервиса

sudo apt install redis-server

Для линукса, или Windows

Для работы также необходима и библиотека

pip install redis

Все три процесса должны быть запущены одновременно! И Python, который будет отправлять сообщения, и Redis, который будет очередью, и Celery worker, который будет выполнять задачи

Celery и Python

Celery и Windows

Если вы используете Windows то для того, чтобы все следующие примеры работали, необходимо использовать только конкретные версии пакетов и версию Python 3.6!

А вообще не надо этим заниматься на винде :)

python == 3.6
celery == 3.1.25
redis == 2.10.6

Простейший пример

Создадим файл tasks.py

Для использования необходимо создать "приложение", в котором необходимо указать название и брокера.

from celery import Celery

broker_url = 'redis://localhost'
app = Celery('tasks', broker=broker_url)


@app.task  # декорирование функции для использования её через Celery 
def add(x, y):
    return x + y

Мы не вызывали задачу!!

Для того чтобы мы могли вызвать задачу, необходимо, чтобы у вас были запущены два отдельных приложения, первое Redis Server:

Запускаем и оставляем работать!

Celery запускать нужно при запущенном виртуальном окружении!

celery -A tasks worker --loglevel=INFO

Также запускаем и не закрываем!

-A app_name - имя приложения, worker - запустить воркер, loglevel - уровень деталей отображаемой информации.

Запуск и обработка результата

Для запуска задач есть много разных способов, тут рассмотрим базовый.

Открываем консоль:

from tasks import add

add.delay(4, 4)

Для запуска задачи немедленно используется метод delay.

Запуск задач возвращает не результат, а AsyncResult, для того чтобы получать значения, необходимо при создании приложения указать параметр backend, который отвечает за то, где будут храниться результаты, таким параметром может быть Redis:

broker_url = 'redis://localhost'
app = Celery('tasks', broker=broker_url, backend=broker_url)

Обратите внимание, мы используем Redis и в качестве брокера, и в качестве бэкэнда сразу.

Результат будет иметь достаточно большое кол-во методов и атрибутов.

Основные два метода это ready() и get():

ready() - булево поле, которое отвечает за то, завершилась задача или еще в процессе.

get() - ждет выполнения задачи и возвращает результат. Рекомендуется использовать после ready(), чтобы не ждать выполнения впустую.

result = add.delay(4, 4)
result.ready()
True
result.get()
8

Иногда описание параметров задачи и ее вызов могут быть в совершенно разных местах, для этого существует механизм подписи:

s1 = add.s(2, 2)
res = s1.delay()
res.get()

В этом примере s1 - это подпись задачи, то есть задача, заготовленная для выполнения, её можно сериализовать и отправить по сети, например, а выполнить в уже совершенно других местах.

Или если вы не знаете параметры целиком:

# incomplete partial: add(?, 2)
s2 = add.s(2)
# resolves the partial: add(8, 2)
res = s2.delay(8)
res.get()

Задачи можно группировать:

from celery import group
from proj.tasks import add

group(add.s(i, i) for i in range(10))().get()

Виды запуска

Есть три варианта запуска задач:

apply_async(args[, kwargs[, …]])

Отправка сообщения с указанием дополнительных параметров:

delay(*args, **kwargs)

Отправка сообщения без каких-либо параметров самого сообщения:

calling (__call__)

Просто вызов, декоратор не мешает нам просто вызвать функцию без Celery. :)

Основные параметры apply_async()

  1. сountdown - выполнить через определённый промежуток времени
add.apply_async((2, 2), countdown=10)
# выполнить через 10 секунд
  1. eta - выполнить в конкретное время
add.apply_async((2, 2), eta=now() + timedelta(seconds=10))
# выполнить через 10 секунд
  1. expires - время, после которого перестать выполнять задачу, можно указать как цифру, так и время
add.apply_async((4, 5), countdown=60, expires=120)
add.apply_async((4, 5), expires=now() + timedelta(days=2))
  1. link - выполнить другую задачу по завершению текущей, основываясь на результатах текущей
add.apply_async((2, 2), link=add.s(16))
# ( 2 + 2 ) + 16

Периодические задачи

Celery может выполнять какие-либо задачи просто по графику.

Для этого нужно настроить приложение:

app.conf.beat_schedule = {
    'add-every-30-seconds': {
        'task': 'tasks.add',
        'schedule': 30.0,
        'args': (16, 16)
    },
}
app.conf.timezone = 'UTC'

Ключ словаря - это только название, можно указать что угодно.

Таск - это выполняемый таск. :)

args - его аргументы.

schedule - частота выполнения в секундах.

Выполнение по CRON

from celery.schedules import crontab

app.conf.beat_schedule = {
    # Executes every Monday morning at 7:30 a.m.
    'add-every-monday-morning': {
        'task': 'tasks.add',
        'schedule': crontab(hour=7, minute=30, day_of_week=1),
        'args': (16, 16),
    },
}

Cron - система задания расписания, можно сделать практически какое угодно.

По движению солнца

from celery.schedules import solar

app.conf.beat_schedule = {
    # Executes at sunset in Melbourne
    'add-at-melbourne-sunset': {
        'task': 'tasks.add',
        'schedule': solar('sunset', -37.81753, 144.96715),
        'args': (16, 16),
    },
}

В данном случае выполнять во время заката по указанным координатам, параметров много, например, закат с учётом зданий ;)

Для запуска по расписанию нужно запускать отдельный воркер для расписания (смотреть в доке)

Celery и Django

Для использования Celery в Django рекомендуется создать еще один файл celery.py на одном уровне с settings.py

from __future__ import absolute_import

import os

from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'djangoProject1.settings')

from django.conf import settings  # noqa

app = Celery('djangoProject1')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) - эта строчка будет отвечать за автоматический поиск таков во всех приложениях.

На том же уровне, где и settings.py создать\использовать файл __init__.py в зависимости от версии Python.

# __init__.py
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.

from .celery import app as celery_app

__all__ = ('celery_app',)

Все задачи необходимо покрывать не стандартным декоратором task, а декоратором shared_task, тогда Django сможет автоматически найти все таски в приложении.

# tasks.py

from celery import shared_task
from demoapp.models import Widget


@shared_task
def add(x, y):
    return x + y


@shared_task
def mul(x, y):
    return x * y


@shared_task
def xsum(numbers):
    return sum(numbers)


@shared_task
def count_widgets():
    return Widget.objects.count()


@shared_task
def rename_widget(widget_id, name):
    w = Widget.objects.get(id=widget_id)
    w.name = name
    w.save()

Также для Django существует много различных расширений, например:

django-celery-results - чтобы хранить результаты в БД или кеше Django, за подробностями в доку.

django-celery-beat - настройка для периодических задач, сразу вшитая в админку Django, за подробностями опять же в доку.

Multithreading. Multiprocessing. GIL.

О потоках и их истоках

Чтобы понять многопоточность, сначала вникнем, что такое процесс. Процесс – это часть виртуальной памяти и ресурсов, которую ОС выделяет для выполнения программы. Если открыть несколько экземпляров одного приложения, под каждый система выделит по процессу. В современных браузерах за каждую вкладку может отвечать отдельный процесс.

Вы наверняка сталкивались с «Диспетчером задач» в Windows (в Linux — «Системный монитор») и знаете, что лишние запущенные процессы грузят систему, а самые «тяжёлые» из них часто зависают, так что их приходится завершать принудительно.

Но пользователи любят многозадачность: хлебом не корми — дай открыть с десяток окон и попрыгать туда-сюда. Налицо дилемма: нужно обеспечить одновременную работу приложений и при этом снизить нагрузку на систему, чтобы она не тормозила. Допустим, «железу» не угнаться за потребностями владельцев — нужно решать вопрос на программном уровне.

Мы хотим, чтобы в единицу времени процессор успевал выполнить больше команд и обработать больше данных. То есть нам надо уместить в каждом кванте времени больше выполненного кода. Представьте единицу выполнения кода в виде объекта — это и есть поток.

К сложному делу легче подступиться, если разбить его на несколько простых. Так и при работе с памятью: «тяжёлый» процесс делят на потоки, которые занимают меньше ресурсов и скорее доносят код до вычислителя (как именно — см. ниже).

У каждого приложения есть как минимум один процесс, а у каждого процесса — минимум один поток, который называют главным, и из которого при необходимости запускают новые.

Разница между потоками и процессами

  • Потоки используют память, выделенную под процесс, а процессы требуют себе отдельное место в памяти. Поэтому потоки создаются и завершаются быстрее: системе не нужно каждый раз выделять им новое адресное пространство, а потом высвобождать его.

  • Процессы работают каждый со своими данными — обмениваться чем-то они могут только через механизм межпроцессного взаимодействия. Потоки обращаются к данным и ресурсам друг друга напрямую: что изменил один — сразу доступно всем. Поток может контролировать «собратьев» по процессу, в то время как процесс контролирует исключительно своих «дочек». Поэтому переключаться между потоками быстрее и коммуникация между ними организована проще.

Какой отсюда вывод? Если вам нужно как можно быстрее обработать большой объём данных, разбейте его на куски, которые можно обрабатывать отдельными потоками, а затем соберите результат воедино. Это лучше, чем плодить жадные до ресурсов процессы.

Но почему такое популярное приложение как Firefox идёт по пути создания нескольких процессов? Потому что именно для браузера изолированная работа вкладок — это надёжно и гибко. Если с одним процессом что-то не так, необязательно завершать программу целиком — есть возможность сохранить хотя бы часть данных.

Что такое многопоточность? Вот мы и подошли к главному. Многопоточность — это когда процесс приложения разбит на потоки, которые параллельно — в одну единицу времени — обрабатываются процессором.

Вычислительная нагрузка распределяется между двумя или более ядрами, так что интерфейс и другие компоненты программы не замедляют работу друг друга.

Многопоточные приложения можно запускать и на одноядерных процессорах, но тогда потоки выполняются по очереди: первый поработал, его состояние сохранили — дали поработать второму, сохранили — вернулись к первому или запустили третий, и т. д.

Multithreading (Многопоточность)

Что такое поток?

В информатике поток — это минимальная единица работы, запланированная для выполнения операционной системой.

О потоках нужно знать следующее:

  • Они существуют внутри процесса;
  • В одном процессе может быть несколько потоков;
  • Потоки в одном процессе разделяют состояние и память родительского процесса.
  • Потоки работают параллельно.

В Python существует встроенный модуль threading, самым простым примером использования будет следующий код:

import time
from threading import Thread


def sleep_me(i):
    print("Поток %i засыпает на 5 секунд.\n" % i)
    time.sleep(5)
    print("Поток %i сейчас проснулся.\n" % i)


for i in range(10):
    th = Thread(target=sleep_me, args=(i,))
    th.start()

Вывод будет примерно следующим:

Поток 0 засыпает на 5 секунд.
Поток 3 засыпает на 5 секунд.
Поток 1 засыпает на 5 секунд.
Поток 4 засыпает на 5 секунд.
Поток 2 засыпает на 5 секунд.
Поток 5 засыпает на 5 секунд.
Поток 6 засыпает на 5 секунд.
Поток 7 засыпает на 5 секунд.
Поток 8 засыпает на 5 секунд.
Поток 9 засыпает на 5 секунд.
Поток 0 сейчас проснулся.
Поток 3 сейчас проснулся.
Поток 1 сейчас проснулся.
Поток 4 сейчас проснулся.
Поток 2 сейчас проснулся.
Поток 5 сейчас проснулся.
Поток 6 сейчас проснулся.
Поток 7 сейчас проснулся.
Поток 8 сейчас проснулся.
Поток 9 сейчас проснулся.

Порядок может быть вообще любым, и мы этот порядок не контролируем!

threading.active_count()

Эта функция возвращает количество исполняемых на текущий момент потоков. Изменим последнюю программу, чтобы она выглядела вот так:

import time
import threading
from threading import Thread


def sleep_me(i):
    print("Поток %i засыпает на 5 секунд." % i)
    time.sleep(5)
    print("Поток %i сейчас проснулся." % i)


for i in range(10):
    th = Thread(target=sleep_me, args=(i,))
    th.start()
    print("Запущено потоков: %i." % threading.active_count())

Результат будет примерно такой:

Поток 0 засыпает на 5 секунд.
Запущено потоков: 2.
Поток 1 засыпает на 5 секунд.
Запущено потоков: 3.
Поток 2 засыпает на 5 секунд.
Запущено потоков: 4.
Поток 3 засыпает на 5 секунд.
Запущено потоков: 5.
Поток 4 засыпает на 5 секунд.
Запущено потоков: 6.
Поток 5 засыпает на 5 секунд.
Запущено потоков: 7.
Поток 6 засыпает на 5 секунд.
Запущено потоков: 8.
Поток 7 засыпает на 5 секунд.
Запущено потоков: 9.
Поток 8 засыпает на 5 секунд.
Запущено потоков: 10.
Поток 9 засыпает на 5 секунд.
Запущено потоков: 11.
Поток 0 сейчас проснулся.
Поток 5 сейчас проснулся.
Поток 2 сейчас проснулся.
Поток 9 сейчас проснулся.
Поток 3 сейчас проснулся.
Поток 7 сейчас проснулся.
Поток 1 сейчас проснулся.
Поток 8 сейчас проснулся.
Поток 6 сейчас проснулся.
Поток 4 сейчас проснулся.

Также обратите внимание, что после запуска всех потоков счетчик показывает число 11, а не 10. Причина в том, что основной поток также учитывается наравне с 10 остальными.

🧨 Потенциальные проблемы многопоточности

Многопоточность даёт мощные возможности, но и приносит с собой множество рисков. Ниже перечислены типичные проблемы, с которыми сталкиваются при работе с потоками.

🪤 1. Race Condition (состязание потоков)

Что происходит: два или более потоков одновременно обращаются к общей переменной или ресурсу, и порядок доступа влияет на результат.

Пример: один поток увеличивает переменную, другой — сбрасывает её в ноль. Итог зависит от того, кто первый.

counter = 0

def increment():
    global counter
    for _ in range(100000):
        counter += 1

# Запускаем два потока, которые инкрементируют счётчик

Проблема: результат может быть не 200000, а что угодно меньше.


🧱 2. Deadlock (взаимоблокировка)

Что происходит: два потока захватывают ресурсы и ждут друг друга, но никто не может продолжить.

Аналогия: два человека в дверном проёме: один держит ручку с одной стороны, другой — с другой. Никто не уступает.

lock1 = Lock()
lock2 = Lock()

# Поток A
with lock1:
    with lock2:
        pass

# Поток B
with lock2:
    with lock1:
        pass

Проблема: оба потока заблокированы и никогда не завершатся.


🍽️ 3. Starvation (голодание)

Что происходит: один поток постоянно захватывает ресурс, не давая другим выполнить свои задачи.

Аналогия: официант всегда обслуживает только одного клиента, игнорируя остальных.

Проблема: потоки остаются в ожидании, даже если они готовы к выполнению.


🔁 4. Livelock (живой тупик)

Что происходит: потоки не блокируются, но находятся в постоянном цикле попыток и отступлений.

Аналогия: два вежливых человека в узком проходе, которые одновременно уступают дорогу друг другу снова и снова.

# Потоки пытаются избежать взаимоблокировки, но делают это одновременно — и не продвигаются

🧠 5. Контекстные переключения (Context Switching Overhead)

Что происходит: частая смена контекста между потоками может отнимать больше времени, чем сама работа.

Проблема: если у нас 1000 потоков, и каждый делает почти ничего, на переключение между ними уйдёт куча ресурсов.


🧩 6. Проблемы синхронизации

Что происходит: отсутствие или неправильное использование синхронизации (например, забыли lock.release() или забыли acquire()).

Проблема: ошибки становятся неочевидными, нестабильными и трудно воспроизводимыми.


🔎 Эти проблемы — основная причина, почему нужно умело использовать синхронизацию: Lock, RLock, Semaphore, Event, Condition и т.д. С ними мы познакомимся дальше.

Синхронизация потоков

🔐 Синхронизация потоков

Когда несколько потоков пытаются одновременно изменить общий объект, возможны конфликты. Представьте, что 5 человек пишут одну строку на доске с мелом: выйдет месиво HWeol,rldo вместо Hello, World.

Чтобы избежать этого, используются механизмы синхронизации.


🔐 Lock — обычный замок

from threading import Lock, Thread

lock = Lock()

def task(data, value):
    with lock:
        data.append(value)

Аналогия: один туалет для всех: кто-то вошёл — другие ждут.


🔀 RLock — для рекурсии и вложенных входов

from threading import RLock

lock = RLock()

def recursive(n):
    if n <= 0:
        return
    with lock:
        print(f"Level {n}")
        recursive(n - 1)

Аналогия: вы в комнате с несколькими дверями и одним ключом для себя.


🚥 Semaphore — ограниченное число мест

from threading import BoundedSemaphore, Thread
from time import sleep

sema = BoundedSemaphore(value=3)

def park_car(num):
    with sema:
        print(f"Car {num} parked")
        sleep(1)
        print(f"Car {num} leaves")

for i in range(5):
    Thread(target=park_car, args=(i,)).start()

Аналогия: парковка на 3 места — дальше очередь.


🚦 Event — зелёный свет

from threading import Event, Thread
from time import sleep

event = Event()

def runner(name):
    print(f"{name} ready...")
    event.wait()
    print(f"{name} runs!")

for i in range(3):
    Thread(target=runner, args=(f"Runner {i}",)).start()

sleep(2)
print("GO!")
event.set()

👮 Condition — охранник с рацией

from threading import Condition, Thread
from time import sleep

cond = Condition()
data_ready = False

def worker():
    with cond:
        while not data_ready:
            cond.wait()
        print("Working with data")

def notifier():
    global data_ready
    sleep(2)
    with cond:
        data_ready = True
        cond.notify_all()

Thread(target=worker).start()
Thread(target=notifier).start()

✅ Таблица: что когда использовать

Сценарий Примитив
Один в руку — другие ждут Lock
Нужно рекурсивно захватить замок RLock
Не более N одновременно Semaphore
Ждем сигнал Event
Ожидаем особое условие Condition

GIL. Global Interpreter Lock

Шикарная статья на тему

Python Global Interpreter Lock (GIL) — это своеобразная блокировка, позволяющая только одному потоку управлять интерпретатором Python. Это означает, что в любой момент времени будет выполняться только один конкретный поток.

Работа GIL может казаться несущественной для разработчиков, создающих однопоточные программы. Но во многопоточных программах отсутствие GIL может негативно сказываться на производительности процессоро-зависимых программ.

Поскольку GIL позволяет работать только одному потоку даже в многопоточном приложении, он заработал репутацию «печально известной» функции.

Фактически Python не вызывает много потоков одновременно, а только очень быстро их переключает, что делает все многопоточные вычисления по факту однопоточными.

Что за проблему в Python решает GIL?

Python подсчитывает количество ссылок для корректного управления памятью. Это означает, что созданные в Python объекты имеют переменную подсчёта ссылок, в которой хранится количество всех ссылок на этот объект. Как только эта переменная становится равной нулю, память, выделенная под этот объект, освобождается.

Вот небольшой пример кода, демонстрирующий работу переменных подсчёта ссылок:

import sys

a = []
b = a
sys.getrefcount(a)
3

В этом примере количество ссылок на пустой массив равно 3. На этот массив ссылаются: переменная a, переменная b и аргумент, переданный функции sys.getrefcount().

Проблема, которую решает GIL, связана с тем, что в многопоточном приложении сразу несколько потоков могут увеличивать или уменьшать значения этого счётчика ссылок. Это может привести к тому, что память очистится неправильно и удалится тот объект, на который ещё существует ссылка.

Счётчик ссылок можно защитить, добавив блокираторы на все структуры данных, которые распространяются по нескольким потокам. В таком случае счётчик будет изменяться исключительно последовательно.

Но добавление блокировки к нескольким объектам может привести к появлению другой проблемы — взаимоблокировки (англ. deadlocks), которая получается только если блокировка есть более чем на одном объекте. К тому же эта проблема тоже снижала бы производительность из-за многократной установки блокираторов.

GIL — это одиночный блокиратор самого интерпретатора Python. Он добавляет правило: любое выполнение байт-кода в Python требует блокировки интерпретатора. В таком случае можно исключить взаимоблокировку, т. к. GIL будет единственной блокировкой в приложении. К тому же его влияние на производительность процессора совсем не критично. Однако стоит помнить, что GIL уверенно делает любую программу однопоточной.

Несмотря на то, что GIL используется и в других интерпретаторах, например в Ruby, он не является единственным решением этой проблемы. Некоторые языки решают проблему потокобезопасного освобождения памяти с помощью сборки мусора.

Как справиться с GIL?

Если GIL у вас вызывает проблемы, вот несколько решений, которые вы можете попробовать:

Многопроцессорность против многопоточности. Довольно популярное решение, поскольку у каждого Python-процесса есть собственный интерпретатор с выделенной под него памятью, поэтому с GIL проблем не будет.

Корутины. О них на следующем занятии.

Python 3.13

В версии Python 3.13 планируются значительные изменения в работе GIL. Основные цели этих изменений:

  1. Улучшение поддержки параллельного выполнения: Новые изменения направлены на снижение ограничений, накладываемых GIL на многопоточные программы. В Python 3.13 будет внедрен новый механизм управления GIL, который позволит потокам более эффективно работать с разделяемыми данными и снизить время ожидания.

  2. Улучшение работы с многопроцессорными системами: Важное направление — это оптимизация работы на многоядерных процессорах. Внесенные изменения позволят более эффективно использовать ресурсы современных процессоров, уменьшая влияние GIL на производительность.

  3. Обратная совместимость и стабильность: Важным аспектом является то, что изменения будут внедряться так, чтобы минимизировать их влияние на существующий код и экосистему Python. Разработчики активно работают над тем, чтобы сохранить обратную совместимость и обеспечить плавный переход на новую версию.

Влияние на разработчиков

Изменения в GIL могут существенно повлиять на разработку многопоточных приложений на Python. Для разработчиков это означает:

  • Повышение производительности: Программы, использующие многопоточность, могут работать быстрее и эффективнее, особенно на многоядерных системах.
  • Упрощение параллелизма: Снижение ограничений GIL позволит разработчикам легче реализовывать параллельные алгоритмы без необходимости переключаться на другие языки или библиотеки для достижения высокой производительности.
  • Потенциальные обновления и оптимизации кода: Хотя изменения будут стремиться сохранить обратную совместимость, возможно, что некоторым разработчикам потребуется адаптировать код для полного использования новых возможностей.

Multiprocessing (Многопроцессорность)

Что такое многопроцессорная обработка Python?

Сначала поговорим о параллельной обработке. Это способ одновременно разбивать и запускать программные задачи на нескольких микропроцессорах. По сути, это попытка сократить время обработки и это то, чего мы можем достичь с помощью компьютера с двумя или более процессорами, или с использованием компьютерной сети. Мы также называем это параллельными вычислениями.

Итак, теперь перейдем к Python Multiprocessing, это способ повысить производительность путем создания параллельного кода. Производители процессоров делают это возможным, добавляя больше ядер к своим процессорам. В многопроцессорной системе приложения разбиваются на более мелкие подпрограммы для самостоятельной работы. Взгляните на однопроцессорную систему. Учитывая несколько процессов одновременно, он пытается прерывать и переключаться между задачами. Как бы вы себя чувствовали, будучи единственным шеф-поваром на кухне с сотнями клиентов? Вы должны были бы выполнять все обычные задачи от выпечки до замеса теста.

Когда это полезно?

  • Мультипроцессор – компьютер с несколькими центральными процессорами.

  • Многоядерный процессор – один вычислительный компонент с более чем одной независимой фактической единицей обработки/ядрами.

В любом случае процессор может выполнять несколько задач одновременно, назначая процессор для каждой задачи.

Пример:

from multiprocessing import Process


def square(n):
    print("Число в квадрате ", n ** 2)


def cube(n):
    print("Число в кубе", n ** 3)


if __name__ == "__main__":
    p1 = Process(target=square, args=(7,))
    p2 = Process(target=cube, args=(7,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print("Мы закончили")

Отличие от многопоточности в том, что в этом случае каждый отдельный процесс будет выполняться отдельным ядром или процессором, и никак не блокируется GIL.

Но процедура создания нового процесса достаточно дорогостоящая, и нет никакого смысла создавать новый процесс для простых действий.

У каждого процесса есть id, название и т. д. эти данные всегда можно извлечь.

Блокировка процессов

Так же как и с потоками у нас может быть ситуация, когда разные процессы обрабатывают одни и те же данные, и чтобы быть уверенным, что действия не происходят одновременно, мы можем заблокировать процесс, синтаксис идентичен.

from multiprocessing import Process, Lock

lock = Lock()


def printer(item):
    lock.acquire()
    try:
        print(item)
    finally:
        lock.release()


if __name__ == "__main__":
    items = ['nacho', 'salsa', 7]
    for item in items:
        p = Process(target=printer, args=(item,))
        p.start()

Для многопроцессорности работают ровно те же самые блокировки, как и для многопоточности.

Пул вызовов

Пул - это возможность создать сразу необходимое количество процессов, а не делать это по одному. В данном примере мы сразу создаём 3 процесса для трех параллельных вычислений.

from multiprocessing import Pool


def double(n):
    return n * 2


if __name__ == '__main__':
    nums = [2, 3, 6]
    pool = Pool(processes=3)
    print(pool.map(double, nums))

Если нам необходимо вычислять одно действие на трёх процессорах, нам поможет функция apply_async():

from multiprocessing import Pool


def double(n):
    return n * 2


if __name__ == '__main__':
    pool = Pool(processes=3)
    result = pool.apply_async(double, (7,))
    print(result.get())

🧷 Потокобезопасные и процессобезопасные типы данных

Когда несколько потоков или процессов одновременно работают с одним и тем же объектом, важно, чтобы доступ к данным был безопасным. Для этого существуют специальные потокобезопасные (thread-safe) и процессобезопасные (process-safe) структуры.


🧵 Потокобезопасные структуры (threading)

В Python обычные структуры данных (list, dict) не являются потокобезопасными, если несколько потоков одновременно читают и пишут в них. Однако есть решения:

🔹 queue.Queue

Очередь, специально разработанная для безопасного доступа из нескольких потоков.

from queue import Queue
from threading import Thread

q = Queue()

def worker():
    while not q.empty():
        item = q.get()
        print(f"Got item: {item}")
        q.task_done()

for i in range(5):
    q.put(i)

threads = [Thread(target=worker) for _ in range(2)]
[t.start() for t in threads]
[t.join() for t in threads]

✔️ Без блокировок. Всё работает безопасно.


🧩 Какие ещё структуры являются потокобезопасными?

Структура Потокобезопасна? Комментарий
list, dict Только с внешними Lock, RLock
queue.Queue Классический вариант
collections.deque ✅ (частично) Безопасна для append, pop, но не все операции
queue.LifoQueue Стек (LIFO)
queue.PriorityQueue Приоритетная очередь

🧬 Процессобезопасные структуры (multiprocessing)

Когда используется multiprocessing, обычные объекты не делят память. Для обмена данными между процессами нужны:

🔹 multiprocessing.Queue

Безопасная очередь для процессов.

from multiprocessing import Process, Queue

def worker(q):
    while not q.empty():
        item = q.get()
        print(f"Process got: {item}")

if __name__ == "__main__":
    q = Queue()
    for i in range(5):
        q.put(i)

    processes = [Process(target=worker, args=(q,)) for _ in range(2)]
    [p.start() for p in processes]
    [p.join() for p in processes]

🔹 multiprocessing.Manager

Менеджер предоставляет общую память между процессами.

from multiprocessing import Manager, Process

def add_item(shared_list):
    shared_list.append("hello")

if __name__ == "__main__":
    with Manager() as manager:
        shared = manager.list()
        p = Process(target=add_item, args=(shared,))
        p.start()
        p.join()
        print(shared)

📊 Сравнение

Тип Поддержка в threading Поддержка в multiprocessing Потокобезопасность Процессобезопасность
list, dict ❌ (небезопасны)
queue.Queue
multiprocessing.Queue
Manager().list()

📝 Вывод

  • Для потоков используем queue.Queue и deque (частично).
  • Для процессовmultiprocessing.Queue и Manager.
  • list и dict не потокобезопасны — оборачивай их в Lock или используй менеджеры.