Python без блокувань. Як працюють потоки
«Бути „в потоці“ — це безумовно те, до чого варто прагнути. Я знаю, коли я там. Я підключаюся до чогось, що значно перевершує мої власні здібності»
Aleta Pippin, художниця-абстракціоністка
Привіт усім шукачам істини! Дана стаття є продовженням розмови про GIL та дослідження продуктивності Python для різних задач. На відміну від минулої, у цій статті зосередимося на практичній стороні питання. Буде більше коду та замірів продуктивності, побудованих на розуміннях, як працює GIL, та до чого тут потоки та процеси. Поговоримо про методи боротьби з GIL, хоча я радше назвала б це спробами подружитися з GIL, окреслимо способи зробити Python код більш ефективним та оптимальним.
— Гонки потоків
— Lock
— Deadlock
— Синхронізація потоків
— Черги потоків
— Інші способи синхронізації
— Як обрати кількість потоків
— Висновки
Тож насамперед раджу прочитати минулу частину, бо подальша інформація буде базуватися на попередньо викладеному матеріалі, що стане чудовим підґрунтям для досліджень потоків у Python. А надалі й процесів, бо наступна частина присвячена цій темі, що, безумовно, також раджу прочитати.
Частина 1: GIL у Python. Ключ до стабільності чи ворог продуктивності?
Частина 3: Розділяй і володарюй. Як працюють процеси в Python
Тож уявіть, що у вас є великий склад для зберігання товарів. Щоб підрахувати кількість кожного товару, вам потрібно виконати обчислення (інвентаризацію). Ви можете розділити роботу на кілька частин: одна група людей рахує загальну суму цін телефонів, інша — телевізорів, третя — холодильників. Кожна група працює незалежно, кожен занурений у свій процес. Підрахунок товарів вимагає обчислень, це складне завдання, де необхідно багато мозкового навантаження. Кожна група виконує свою роботу паралельно, а у результаті всі підрахунки будуть об’єднані в єдиний звіт суми цін товарів на складі.
Тепер уявіть, що товари зі складу потрібно відправити клієнтам. Одні працівники завантажують товари, інші — реєструють замовлення в системі, треті — пакують коробки. Вони працюють у бригадах, де кожна виконує завдання паралельно іншим, але їх робота не є важкими обчисленнями — це координація завантаження/вивантаження (вводу/виводу), ці потоки обробляють взаємодію з клієнтами та інвентарем. Вони виконують різну роботу, що є важливими частинами єдиного процесу, але не мають бути об’єднаними в єдиний звіт.
Це маленька візуалізація роботи потоків та процесів та сфери їхнього застосування. Потоки використовуються для задач із великою кількістю вводу-виводу (англ. I/O-bound tasks), наприклад, завантаження файлів, обробка вебзапитів, взаємодія з базами даних. Потоки працюють у межах одного процесу і ділять пам’ять. Вони не блокують GIL, тож можуть бути ефективні для паралельного виконання завдань.
Процеси використовуються для задач, що вимагають великих обчислень (англ. CPU-bound tasks), наприклад, обробка зображень, аналіз даних, машинне навчання. У попередній статті ми розглядали, як використання потоків для такого виду завдань не може допомогти виконати код швидше, ба більше — навіть уповільнить обчислення. Натомість процеси — чудовий інструмент для цього, і вони ізольовані, що дозволяє уникнути проблем із глобальним блокуванням інтерпретатора (GIL). Про процеси неодмінно поговоримо у наступній частині.
А зараз почнемо з потоків. Ми вже багато про них поговорили у контексті механізму роботи GIL, про їх природу та використання, продовжимо розкривати нові грані даної теми.
Потік — це найменша одиниця виконання програми. У Python потоки дозволяють виконувати кілька операцій одночасно, що може бути корисним для підвищення продуктивності, особливо в задачах вводу-виводу (англ. I/O tasks). У Python існує основний потік, що є головним потоком виконання програми, який створюється автоматично при запуску скрипта.
Гонки потоків
Ми починали підіймати тему пам’яті у контексті потоків, повернімося до прикладу, що вже розглядали.
У Python всі потоки в одному процесі ділять єдиний простір пам’яті. Це означає, що всі потоки можуть читати та змінювати одні й ті ж змінні, об’єкти та ресурси. Безумовно це чудове рішення, бо немає потреби налаштовувати додаткові механізми для передачі інформації, оскільки потоки автоматично мають доступ до одних і тих же змінних. До того ж це сприяє більш оптимальному використанню пам’яті, бо єдина пам’ять зменшує витрати на створення копій даних.
Але є «але». Через те, що кілька потоків одночасно мають доступ до однієї й тієї ж змінної, може виникнути конфлікт.
На зображенні показано приклад проблеми гонки потоків (англ. race condition), коли два потоки одночасно працюють зі спільною змінною ‘x’. У першому випадку потоки читають і оновлюють значення в правильному порядку, що дає результат ‘x = 12’. У другому випадку через перемикання контексту (див. у попередній частині статті) обидва потоки читають початкове значення ‘x = 10’, і результат стає некоректним — ‘x = 11’. Це демонструє проблему відсутності синхронізації між потоками.
Ба більше, через спільний простір пам’яті існує додаткова проблема з її очищенням. У Python кожен створений об’єкт має лічильник посилань, який відстежує кількість змінних або структур, що посилаються на об’єкт у пам’яті. Якщо лічильник посилань об’єкта зменшується до нуля, це означає, що на нього більше нічого не посилається, і пам’ять, яку він займав, автоматично звільняється. Така поведінка рано чи пізно спричинить проблеми із завчасно видаленими об’єктами або ж захаращенням вільного місця в купі (англ. heap) — пам’яті Python. Ознайомитися з темою пам’яті в Python рекомендую з однієї з моїх статей — «Мистецтво управління пам’яттю в Python: розуміння, використання та оптимізація».
Можемо розглянути трохи складніший приклад, де потоки виконують різні дії: перший — додавання чисел, другий — множення. У підсумку спостерігаємо ту саму проблему: потоки отримують кожен свій результат, що складно синхронізувати, програма веде себе непередбачувано від запуску до запуску через відсутність черги виконання потоків, та бачимо хаотичність отримання результатів через неконтрольовані перемикання.
Давайте повернемося до нашого більш простого прикладу зі звичайним інкрементом числа та подивимося, як це буде працювати на практиці.
import threading class Counter: def __init__(self): self.value = 0 def increase(self): self.value += 1 def work(counter: Counter, operations_count: int): for _ in range(operations_count): counter.increase() def run_threads(counter: Counter, count: int, operations_count: int): threads = [] for _ in range(count): t = threading.Thread(target=work, args=(counter, operations_count)) t.start() threads.append(t) for t in threads: t.join() if __name__ == '__main__': threads_count = 10 operations_per_thread_count = 1_000_000 counter = Counter() run_threads(counter, threads_count, operations_per_thread_count) excepted_result = threads_count * operations_per_thread_count actual_result = counter.value print(f'Очікуваний результат: {excepted_result}, актуальний результат: {actual_result}')
Побудуймо клас, що буде змінювати значення лічильника, інкрементуючи його, запустимо програму на 10 потоках і 1 мільйоні операцій для кожного та надрукуємо результат на екран. Очікуване значення — 10 000 000, а отримаємо ..? Хм, 10 000 000
Очікуваний результат: 10000000, актуальний результат: 10000000
Але чому? Працює аж 10 потоків, невже не виникло проблеми гонки потоків при виконанні мільйона додавань для кожного? Давайте подивимося результат, запустивши програму на різних версіях Python.
Bash скрипт для запуску:
#!/bin/bash PYTHON_VERSIONS=(3.7 3.8 3.9 3.10 3.11 3.12 3.13) VENV_DIR="./venvs" mkdir -p "$VENV_DIR" for VERSION in "${PYTHON_VERSIONS[@]}"; do VENV_PATH="$VENV_DIR/$VERSION" "python$VERSION" -m venv "$VENV_PATH" source "$VENV_PATH/bin/activate" python --version python race_condition.py deactivate done
Результат:
Python 3.7.8 Очікуваний: 10000000, актуальний: 5265961 Python 3.8.10 Очікуваний: 10000000, актуальний: 4040774 Python 3.9.13 Очікуваний: 10000000, актуальний: 3828965 Python 3.10.11 Очікуваний: 10000000, актуальний: 10000000 Python 3.11.9 Очікуваний: 10000000, актуальний: 10000000 Python 3.12.6 Очікуваний: 10000000, актуальний: 10000000 Python 3.13.1 Очікуваний: 10000000, актуальний: 10000000
Тож для версій, молодших за Python 3.10.x, проблема існує, для інших — все працює як належить. Чи не логічно припустити, що проблему гонки потоків було виправлено, починаючи з версії Python 3.10?
Не виправлено, але покращено. У Python 3.10 було введено оптимізацію, яка змінила спосіб роботи GIL. Раніше GIL міг звільнятися і захоплюватися на будь-якій байткод-інструкції, але тепер це відбувається лише на певних «спеціальних» байткод-інструкціях, які називаються «eval breakers».
Коли Python виконує наш код, він перекладає його у набір байткод-інструкцій. Якщо спростити, для нашого прикладу це виглядає якось так:
- LOAD_ATTR — завантажує значення змінної value;
- LOAD_CONST — завантажує константу 1;
- BINARY_OP — додає 1 до значення value;
- STORE_ATTR — зберігає результат у змінну value;
- JUMP_BACKWARD — перемикає на наступну ітерацію.
У
Інструкції, які можуть (але не обов’язково) викликати звільнення GIL, це JUMP_BACKWARD (або JUMP_ABSOLUTE, залежно від версій Python), яка виконує перехід на початок циклу і може перевіряти GIL у довгих циклах, або CALL (у версіях до Python 3.11 — CALL_FUNCTION), що може звільнити GIL у випадках, коли викликається функція C, I/O або складні обчислення. Байт-код інструкції для програми можна побачити за допомогою модуля dis у Python, що використовується для дизасемблювання (розбору) байткоду, який виконує інтерпретатор Python.
Я запускала так:
import dis dis.dis(Counter().increase) dis.dis(work)
Таким чином можна припустити, що при використанні Python 3.10.x та новіших ми не зустрінемося зі станом гонки, все ж працює.
Було б добре, але ні. Дійсно, для такого простого прикладу ця оптимізація позбавить нас перемикань між потоками між
Але якщо в коді між
Давайте це перевіримо. Допишемо у рядку додавання 1 до value будь-яку дію, що викличе, наприклад, CALL-інструкцію, здатну звільнити GIL, та запустимо на різних версіях Python знову.
Напишемо один з варіантів:
def increase(self): self.value += abs(1) або def increase(self): self.value += int(1) або def increase(self): self.value += round(1) або def increase(self): self.value += (lambda x: x + 1)(0) тощо
Подивимося результат:
Python 3.7.8 Очікуваний: 10000000, актуальний: 2049940 Python 3.8.10 Очікуваний: 10000000, актуальний: 2883001 Python 3.9.13 Очікуваний: 10000000, актуальний: 3843092 Python 3.10.11 Очікуваний: 10000000, актуальний: 4598112 Python 3.11.9 Очікуваний: 10000000, актуальний: 3595776 Python 3.12.6 Очікуваний: 10000000, актуальний: 5064150 Python 3.13.1 Очікуваний: 10000000, актуальний: 7388866
Запустивши знову dis.dis(Counter().increase), можемо перевірити, що порівняно з минулим рядом байткод-інструкцій у новому з’явилася CALL-інструкція, що і спричиняє «розірвання» операції додавання перемиканням потоків та призводить до стану гонки. У результаті чого, як бачимо вище, на жодній версії Python код не працює, як належить.
Назви інструкцій можуть різнитися залежно від версії Python, що ви використовуєте, раджу дивитися на версіях Python3.11 і старше.
Перший запуск без CALL-інструкції:
8 0 RESUME 0 9 2 LOAD_FAST 0 (self) 4 COPY 1 6 LOAD_ATTR 0 (value) 16 LOAD_CONST 1 (1) 18 BINARY_OP 13 (+=) 22 SWAP 2 24 STORE_ATTR 0 (value) 34 LOAD_CONST 0 (None) 36 RETURN_VALUE
Другий запуск з CALL-інструкцією для порівняння:
8 0 RESUME 0 9 2 LOAD_FAST 0 (self) 4 COPY 1 6 LOAD_ATTR 0 (value) 16 LOAD_GLOBAL 3 (NULL + int) 28 LOAD_CONST 1 (1) 30 PRECALL 1 34 CALL 1 44 BINARY_OP 13 (+=) 48 SWAP 2 50 STORE_ATTR 0 (value) 60 LOAD_CONST 0 (None) 62 RETURN_VALUE
Lock
Отже, гонка потоків. Що з цим робити? Питання стану гонки — це питання синхронізації потоків, тож треба діяти у цьому напрямку.
Для розв’язання цієї проблеми використовується threading.Lock. Це механізм, який дозволяє одному потоку отримати доступ до змінної, блокуючи інші потоки до завершення операції.
Рішенням використати threading.Lock ми гарантуємо, що тільки один потік виконує операцію додавання у певний момент часу.
Додамо у код використання threading.Lock:
class Counter: def __init__(self): self.value = 0 self.lock = threading.Lock() def increase(self): with self.lock: self.value += 1 . . .
Та подивимося результат:
Python 3.7.8 Очікуваний: 10000000, актуальний: 10000000 Python 3.8.10 Очікуваний: 10000000, актуальний: 10000000 Python 3.9.13 Очікуваний: 10000000, актуальний: 10000000 Python 3.10.11 Очікуваний: 10000000, актуальний: 10000000 Python 3.11.9 Очікуваний: 10000000, актуальний: 10000000 Python 3.12.6 Очікуваний: 10000000, актуальний: 10000000 Python 3.13.1 Очікуваний: 10000000, актуальний: 10000000
Використання with self.lock гарантує, що self.value += 1 виконається атомарно: поки один потік змінює value, інші чекають. Це усуває проблему гонки потоків, що можемо побачити за допомогою повторного запуску на різних версіях Python.
Deadlock
Добре, ми домоглися свого — працює! Тепер треба врахувати нові проблеми, що міг з собою принести Lock. Розглянемо новий приклад:
import threading class DeadLockExample: def __init__(self): self.lock = threading.Lock() def outer_method(self): with self.lock: print('Зовнішній метод залоковано') self.inner_method() def inner_method(self): with self.lock: print('Внутрішній метод залоковано') example = DeadLockExample() thread = threading.Thread(target=example.outer_method) thread.start() thread.join()
Коли запустимо, побачимо, що програма застрягає, тобто ми зіткнулися з проблемою deadlock.
Чому?
- потік захоплює self.lock у outer_method();
- викликається inner_method(), який теж намагається отримати self.lock, але він вже заблокований цим же потоком;
- Python блокує виконання, бо Lock не дозволяє повторне захоплення тим самим потоком.
Цьому існує цікаве рішення — threading.RLock (Reentrant Lock), який дозволяє одному і тому ж потоку захоплювати блокування кілька разів без блокування самого себе.
На відміну від звичайного Lock, який не дозволяє повторне захоплення тим самим потоком, RLock відстежує кількість разів, коли потік захопив блокування, і дозволяє його відпустити лише після відповідної кількості release(). У реалізації Lock методи acquire() та release() відповідають за захоплення та вивільнення ресурсів.
Давайте змінимо нашу програму, замінивши self.lock = threading.Lock() на self.lock = threading.RLock(). Запустивши, упевнимося, що все буде працювати.
RLock відстежує кількість разів, коли потік захопив блокування:
- перший раз acquire() отримує блокування;
- кожен наступний acquire() просто збільшує лічильник захоплень;
- release() зменшує лічильник, але звільняє локувальник тільки тоді, коли лічильник доходить до нуля.
Якщо зробити це вручну, то різниця буде виглядати так.
Використання Lock:
import threading lock = threading.Lock() lock.acquire() # Захоплення блокування lock.release() # Звільнення блокування
Використання RLock:
import threading rlock = threading.RLock() rlock.acquire() # Захоплюємо 1-й раз rlock.acquire() # Захоплюємо 2-й раз rlock.release() # Перший `release` rlock.release() # Другий `release`, тепер повністю розблоковано
Синхронізація потоків
Але тоді використання локування зводить нанівець сенс багатопоточності? Не зовсім. Lock та RLock корисні у тих точкових випадках, коли правильність обчислень важливіша за швидкість. Наприклад, у банківських операціях або в оновленнях даних в базі критично важливо уникати гонки потоків.
Але якщо ми хочемо зберегти високу продуктивність, варто розглянути інші підходи.
Черги потоків
queue.Queue — це потокобезпечна черга у Python, яка використовується для передачі даних між потоками. Вона дозволяє уникнути гонки потоків і керує синхронізацією доступу до даних між потоками без необхідності використовувати Lock.
Черга працює за принципом FIFO (First In, First Out) — перший доданий елемент буде першим витягнутий.
import threading import queue import time class ProducerConsumer: def __init__(self, num_consumers: int = 5, num_elements: int = 10): self.q = queue.Queue() self.producer_thread = threading.Thread(target=self._producer) self.consumer_threads = (threading.Thread(target=self._consumer) for _ in range(num_consumers)) self.num_elements = num_elements def _producer(self): for i in range(self.num_elements): self.q.put(i) # Додає елемент у чергу print(f'Додано {i}') time.sleep(0.1) # Імітація роботи def _consumer(self): while not self.q.empty(): item = self.q.get() # Отримує елемент print(f'Отримано {item}') self.q.task_done() # Позначає елемент як оброблений time.sleep(0.2) # Імітація обробки def run(self): self.producer_thread.start() time.sleep(0.5) # Даємо продюсеру час додати дані for t in self.consumer_threads: t.start() self.producer_thread.join() # Чекаємо завершення продюсера self.q.join() # Чекаємо, поки всі елементи будуть оброблені if __name__ == '__main__': pc = ProducerConsumer() pc.run()
Клас ProducerConsumer використовує чергу queue.Queue() для синхронізації потоків між продюсером і споживачами. Продюсер запускається в окремому потоці і додає в чергу 10 елементів. Після додавання кожного елемента він робить невелику паузу, що імітує процес обробки.
Споживачі запускаються в окремих потоках і працюють, поки в черзі є елементи. Вони отримують значення з черги, обробляють його та позначають як виконане. Після обробки також робиться невелика пауза, щоб імітувати час на виконання задачі.
У методі run() спочатку стартує продюсер, а потім із затримкою запускаються споживачі. Використовується join() для того, щоб дочекатися завершення роботи продюсера і всіх споживачів.
- q.put(i) додає елемент у чергу, забезпечуючи безпечний доступ із кількох потоків.
- q.get() забирає елемент із черги, блокуючи потік, якщо черга порожня (за замовчуванням).
Ці методи використовують внутрішні механізми блокування, що дозволяє безпечно передавати дані між потоками без необхідності додаткової синхронізації.
Існують й інші види черг:
- Queue: першим прийшов — першим вийшов (FIFO);
- Stack: останнім прийшов — першим вийшов (LIFO);
- Deque: двостороння черга — елементи додаються/видаляються з обох кінців;
- Priority Queue: пріоритетна черга — елементи обробляються за пріоритетом (від високого до низького).
Коли слід ухвалити рішення використовувати черги? Наприклад, коли один потік генерує дані, а інший їх обробляє (архітектура типу producer-consumer), або коли потрібно безпечно передавати дані між потоками, або коли Lock не допомагає. Насправді застосування може бути різноманітне і залежить від задачі, що розглядається, безпечна синхронізація між потоками доволі часто може стати у нагоді.
Інші способи синхронізації
Розглянемо інші способи синхронізації потоків та невеликі легковісні приклади для кожного. У попередній статті ми вже говорили про семафори, вони будуть першими. threading.Semaphore використовується для обмеження кількості потоків, які можуть виконувати певну операцію одночасно. Наприклад, якщо є ресурс, до якого може звертатися одночасно лише кілька потоків, Semaphore запобігає перевантаженню. Це корисно при підключеннях до бази даних, мережевих запитах або обмеженій кількості обчислювальних ресурсів. Кожен потік, що викликає acquire(), отримує доступ, якщо ще є вільні слоти, і блокується, якщо всі слоти зайняті. Після завершення роботи потік викликає release(), звільняючи ресурс.
Семафор дозволяє контролювати максимальну кількість потоків, що виконують критичну операцію. У цьому прикладі тільки 2 потоки можуть працювати одночасно.
import threading import time def worker(name: str): with sem: print(f'{name} отримав доступ') time.sleep(1) print(f'{name} звільнив доступ') sem = threading.Semaphore(2) # Дозволяє доступ лише 2 потокам одночасно threads = [threading.Thread(target=worker, args=(f'Потік-{i}',)) for i in range(5)] for t in threads: t.start() for t in threads: t.join()
threading.Condition дозволяє одним потокам чекати, поки інші завершать необхідні дії. Він використовується, коли один або більше потоків залежать від зміни стану в іншому потоці. Потоки, що очікують, викликають wait(), поки інший потік не викличе notify() або notify_all(). Це корисно, коли споживачі повинні чекати, поки продюсер підготує дані, або коли обробник даних повинен знати, що певні умови виконані. Condition дозволяє оптимізувати взаємодію між потоками та уникати активного очікування (англ. long polling), зупинити потоки, поки певна умова не буде виконана, і сповістити їх про продовження.
import threading import time class DataPipeline: def __init__(self): self.data_ready = False self.condition = threading.Condition() def produce(self): with self.condition: print('Producer генерує дані') time.sleep(1) self.data_ready = True self.condition.notify_all() # Сповіщає всі потоки, що дані готові def consume(self): with self.condition: while not self.data_ready: self.condition.wait() # Чекає на `notify_all()` print('Consumer отримав дані') pipeline = DataPipeline() threading.Thread(target=pipeline.consume).start() threading.Thread(target=pipeline.produce).start()
threading.Event використовується для сигналізації між потоками, коли один потік повинен чекати на дозвіл від іншого перед початком роботи. Потік, що чекає, викликає wait(), і він заблокований, поки інший потік не викличе set(), сигналізуючи, що можна продовжувати. Event зручно використовувати, коли необхідно контролювати запуск потоків або координацію їх виконання, наприклад, у ситуаціях, коли потік має почати роботу лише після отримання певного сигналу.
import threading import time event = threading.Event() def worker(): print('Очікую на сигнал') event.wait() # Чекає на set() print('Потік запущено!') thread = threading.Thread(target=worker) thread.start() time.sleep(2) event.set() # Дає дозвіл потоку працювати
concurrent.futures.ThreadPoolExecutor спрощує масовий запуск потоків, автоматично керуючи пулом потоків. Замість ручного створення та управління Thread, можна просто передати список завдань, і ThreadPoolExecutor розподілить їх між доступними потоками. Це дозволяє ефективно використовувати багатоядерні системи для задач, що містять очікування (наприклад, мережеві запити), без надмірного створення потоків. Завдяки map() та submit() можна легко запускати функції у паралельному режимі, отримуючи результати одразу або по завершенню кожного потоку.
from concurrent.futures import ThreadPoolExecutor def task(n: int) -> int: return n * 2 with ThreadPoolExecutor(max_workers=3) as executor: results = executor.map(task, range(5)) print(list(results)) # [0, 2, 4, 6, 8]
Як обрати кількість потоків
Залишилось цікаве питання: а як визначити оптимальну кількість потоків для конкретної задачі. Ба більше — обрати їх кількість так, щоб не нашкодити. Ми вже говорили про те, що потоки створюють накладні витрати, можуть викликати зворотний ефект — коли код стане працювати навпаки довше. Тому їх слід застосовувати уміло.
Насправді немає якогось правила, як визначити найкраще число потоків для запуску тої чи іншої програми. Я не знаю ніяких механізмів для встановлення оптимальної кількості та на просторах інтернету не знайшла такої інформації. Хоча б якось прослідкувати приріст продуктивності можна, лише запустивши програму на різних кількостях потоків, так би мовити, методом спроб та помилок, а точніше — експериментів.
Я розкажу про власне бачення, як порахувати найкращу кількість потоків для своєї програми. З цим, звісно, можна не погоджуватися.
Існує закон Літтла, що є фундаментальним принципом теорії черг і визначає середню кількість елементів у системі обслуговування. Він формулюється так:
де:
- L — середня кількість елементів у системі (клієнтів, процесів, задач);
- 𝜆 - середня швидкість надходження елементів (інтенсивність потоку заявок);
- 𝑊 - середній час перебування елемента в системі (час очікування + обслуговування).
Припустимо, що в магазин приходить в середньому 10 клієнтів на годину (𝜆 = 10), і кожен проводить там півгодини (𝑊 = 0.5). Тоді середня кількість клієнтів у магазині:
Тобто в будь-який момент у магазині в середньому 5 клієнтів. Якщо у нас є багатопотокова система, де кожен потік обробляє завдання певний час, закон Літтла може допомогти оцінити оптимальну кількість потоків, необхідних для ефективного обслуговування запитів.
Якщо ми маємо задачу, де процес обчислює щось за T секунд обчислення, а потім чекає на завершення операції (наприклад, на відповідь сервера) T секунд очікування, це означає, що процесор більшу частину часу простоює.
Щоб приховати цей час очікування та максимально навантажити процесор, потрібно запустити стільки потоків, щоб під час очікування одного потоку інші могли виконувати обчислення.
Отже, оптимальна кількість потоків Nопт повинна бути такою, щоб під час очікування одного потоку CPU встигав повністю обробити інші потоки:
де:
- Nопт — оптимальна кількість потоків;
- Tобч — середній час виконання операцій (не рахуючи очікування);
- Tочік — середній час очікування (наприклад, відповіді сервера).
Припустимо, що:
- обробка даних займає 50 мс (Tобч = 0.05 с);
- очікування відповіді сервера — 450 мс (Tочік = 0.45 с).
Тоді:
Отже, оптимально запустити 10 потоків, щоб CPU був завантажений на максимум.
Коли формула може не спрацювати? Вузьке місце у ресурсах (наприклад, сервер блокує одночасні запити), або велике навантаження на пам’ять (якщо кожен потік займає багато RAM). Також у ситуації обмеження системи (якщо Python має обмеження на потоки або використовується GIL). У таких випадках варто експериментувати з профілюванням продуктивності.
Висновки
Отже, потоки в Python — це потужний інструмент для вирішення завдань, що потребують багатозадачності. Хоча обмеження GIL може стати перешкодою для деяких сценаріїв, правильне розуміння його роботи дозволяє обрати оптимальний підхід для конкретної задачі: використання потоків для I/O-операцій, процесів — для обчислювально-інтенсивних завдань або асинхронного програмування — для максимальної ефективності. Я не торкаюся теми асинхронності у даній серії статей, проте вона є чудовим інструментом для оптимізації багатозадачності.
Таким чином, потоки в Python корисні для паралельного виконання I/O-задач, проте через GIL потоки не дають справжнього паралелізму для CPU-навантажених задач, і в таких випадках краще використовувати процеси.
Частина 1: GIL у Python. Ключ до стабільності чи ворог продуктивності?
Частина 3: Розділяй і володарюй. Як працюють процеси в Python
Про це й поговоримо далі — в заключній частині даної серії про багатозадачність. А всім, хто ще не ознайомився з минулою статтею про GIL, раджу повернутися до неї. Сподіваюсь, було корисно та цікаво. Дякую за увагу! Не прощаємось.
11 коментарів
Додати коментар Підписатись на коментаріВідписатись від коментарів