Виконуємо синхронний код в асинхронному середовищі
Підписуйтеся на Telegram-канал «DOU #tech», щоб не пропустити нові технічні статті
Привіт, мене звуть Марк, я Back-end Engineer в компанії Welltech і сьогодні ми поговоримо про те, як одночасно запустити багато функцій. І сфокусуємось ми на одночасному виконанні синхронного IO bound коду.
Хочу нагадати, що майже 10 років тому в Python 3.4 було додано модуль asyncio авторства розробника з України Андрія Свєтлова.
Почнемо з простого та канонічного прикладу того, як в сучасному Python асинхронно виконати декілька IO bound функцій:
import asyncio async def io_bound_task(arg): await asyncio.sleep(3) # емулюємо IO return f"Result for {arg}" async def main(): tasks = [io_bound_task(arg) for arg in range(10)] return await asyncio.gather(*tasks) asyncio.run(main())
Такий підхід гарно підходить, якщо ви запускаєте асинхронний код. Але часто так буває, що цільовий код, який ви хочете виконати умовно паралельно, не є асинхронним.
Поширеним прикладом такої ситуації є використання бібліотек-клієнтів якогось API. Ось простий приклад:
def collect_account_data(acc_id): # do some request preparation or data extraction here return client.get_account(acc_id) # sync IO bound code def main(accounts_ids): return [collect_account_data(acc_id) for acc_id in accounts_ids]
Наш код працює, але працює синхронно і тому повільно, і ми хочемо, щоб функція collect_account_data виконувалась умовно паралельно для декількох акаунтів.
Раніше така задача вирішувалась за допомогою модуля threading. Але в сучасному Python є альтернатива — це використання asyncio.to_thread() та loop.run_in_executor() в asyncio.
Тож перепишімо функцію main так, щоб collect_account_data виконувалась для декількох акаунтів одночасно:
import asyncio def main(accounts_ids): async def collect_data_async(): tasks = [asyncio.to_thread(collect_account_data, acc_id) for acc_id in accounts_ids] return await asyncio.gather(*tasks) return asyncio.run(collect_data_async())
В цьому прикладі asyncio.run виконує основну роботу. Вона створює новий event loop, виконує передану корутину і потім закриває event loop.
Тепер розгляньмо 2 модифікації нашої задачі. Перша варіація — в нашому third party API реалізовано тротлінг, і під час виконання нашого коду ми отримуємо помилку на кшталт «Too Many Requests».
Щоб вирішити таку проблему, ми можемо явно створити ThreadPoolExecutor і вказати максимальну кількість одночасних потоків.
Тепер наш код буде виглядати так:
from concurrent.futures import ThreadPoolExecutor def main(accounts_ids): max_concurency = 5 # adjust the value due to rate limit policy loop = asyncio.get_running_loop() functions_to_run = [] with ThreadPoolExecutor(max_workers=max_concurency) as executor: for acc_id in accounts_ids: functions_to_run.append( loop.run_in_executor(executor, collect_account_data, acc_id) ) results = loop.run_until_complete(asyncio.gather(*functions_to_run)) loop.close() return results
Розберімося, що тут відбувається. Функція loop.run_in_executor запускає collect_account_data в окремому потоці. Функція loop.run_in_executor першим параметром приймає інстанс executor, і якщо він не був переданий, то неявно створюється executor дефолтного класу, тобто ThreadPoolExecutor.
Примітка: у цьому прикладі ми самі створюємо event loop і це означає що нам потрібно його закрити після виконання задач. Це бажано робити для коректного завершення роботи програми та для звільнення ресурсів, які використовуються в задачах, таких як файли, мережеві з’єднання тощо. В наступних прикладах я пропускаю закриття event loop для лаконічності прикладів.
В цьому оновленому прикладі наш код виконує не більше 5 одночасних запитів до API. Але що робити, якщо перед нами стоїть протилежна задача: тротлінгу немає, у нас сотні чи тисячі ID в масиві accounts_ids і нам потрібно прискорити виконання даного коду.
Що ж, немає нічого простішого ніж збільшити max_workers, наприклад до декількох сотень і наш код запрацює в декілька разів швидше. Річ у тім, що за замовчуванням ThreadPoolExecutor створює кількість воркерів (потоків), що дорівнює кількості CPU + 4.
Наступний варіант ускладнення нашої задачі — це необхідність додати таймаут до виконання кожної функції в потоці. Для максимальної простоти прикладів ми відійдемо від використання third party API.
import asyncio import time def sync_function(arg): time.sleep(random.choice(range(1, 10))) return f"Result for {arg}" async def run_sync_function_with_timeout(arg): timeout_seconds = 2 try: return await asyncio.wait_for(asyncio.to_thread(sync_function, arg), timeout_seconds) except asyncio.TimeoutError: return None def main(): loop = asyncio.get_running_loop() tasks = [run_sync_function_with_timeout(arg) for arg in range(10)] results = loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True)) return [res for res in results if res is not None]
В цьому прикладі функція main поверне результати виконання sync_function для всіх функцій що встигли закінчити роботу до таймауту. Тут варто зазначти, що після того, як ми отримуємо результат функції main, функції які ще не завершили виконання продовжують свою роботу.
Наступний варіант задачі — загальний таймаут, який обмежує час виконання функції main.
def sync_function(arg): time.sleep(random.choice(range(1, 10))) return f"Result for {arg}" def main(timeout=3): loop = asyncio.get_running_loop() tasks = [asyncio.to_thread(sync_function, arg) for arg in range(10)] try: loop.run_until_complete(asyncio.wait_for(asyncio.gather(*tasks, return_exceptions=True), timeout)) except asyncio.TimeoutError: pass
Недоліком такого рішення є те, що функція main не повертає результати виконання sync_function.
Виправляємо це у наступному прикладі:
def main(timeout=3): loop = asyncio.get_running_loop() tasks = [loop.run_in_executor(None, sync_function, i) for i in range(10)] loop.run_until_complete(asyncio.wait(tasks, timeout=timeout) return [task.result() for task in tasks if task.done()]
Ще раз нагадаю, що у всіх наведених прикладах з таймаутом ті функції, що «не вклались» в таймаут продовжують працювати після таймауту. Але інколи нам треба зупинити роботи всіх функцій одразу після настання таймауту.
Перше, що спадає на думку для вирішення такою ситуації — зупинити всі потоки з незакінченими задачами. Але це погана ідея. Реалізувати таку задачу не так просто, тому що:
- Python не надає інструменти для її вирішення out of the box;
- примусове переривання потоку може порушити консистентність даних та призвести до resource leaking та deadlocks.
Тому якщо вам дійсно потрібно швидко зупиняти виконання задач, то для цього доведеться використовувати модуль multiprocessing.
Все ж таки повернемось до asyncio. Якщо ваша цільова функція робить декілька ітерацій з IO викликами, то можна додати логіку припинення виконання функції всередину цільової функції, використовуючи глобальний флаг.
continue_execution = True def sync_function(arg): global continue_execution for i in range(10): if not continue_execution: break time.sleep(arg * 0.1) print(f"Finished task for {arg}") return f"Result for {arg}" def main(): global continue_execution loop = asyncio.get_running_loop() tasks = [loop.run_in_executor(None, sync_function, i) for i in range(20)] done, pending = loop.run_until_complete(asyncio.wait(tasks, timeout=3)) if pending: continue_execution = False # this allows to cancel tasks which were not passed to thread yet for task in pending: task.cancel() return [task.result() for task in done]
Обробка помилок
Тепер трохи поговоримо про обробку помилок. При використанні ThreadPoolExecutor перша помилка всередині треду зупинить виконання всього коду. Однакова поведінка буде як при явному використанні ThreadPoolExecutor, так і при неявному через loop.run_in_executor або asyncio.to_thread.
Треба зазначити, що при помилці зупинка коду відбувається не миттєво, а після закінчення роботи пулу, в якому була помилка. Скоріше за все, це не те, що вам потрібно і ви очікуєте, що помилка в одній з функцій не зупинить виконання інших функцій.
На мою думку, найпростіший спосіб вирішити таку задачу — це огорнути весь код цільової функції в try/except блок або використовувати окрему функцію обгортку. Найпростіший варіант виглядатиме так:
def sync_function(arg): try: time.sleep(1) if random.random() > 0.5: 1/0 return arg * arg except Exception: return None def main(): async def async_function(): tasks = [asyncio.to_thread(sync_function, arg) for arg in range(10)] return await asyncio.gather(*tasks) return [r for r in asyncio.run(async_function()) if r is not None]
Або такий більш гнучкий варіант:
def sync_function(arg): time.sleep(1) if random.random() > 0.5: 1/0 return arg * arg def sync_function_with_error_handling(arg): try: return {"result": sync_function(arg), "error": None} except Exception as err: # do some extra error handling here return {"result": None, "error": err} def main(): async def async_function(): tasks = [asyncio.to_thread(sync_function_with_error_handling, arg) for arg in range(10)] return await asyncio.gather(*tasks) results = asyncio.run(async_function()) return [r['result'] for r in results if r.get('result')]
Звичайно, якщо ваша функція-обгортка повторюється для різних функцій, то буде розумно замінити її декоратором.
У цій статті ми розглянули одночасне виконання синхронного IO-bound коду в асинхронному середовищі Python за допомогою модуля asyncio. Показані приклади демонструють різні сценарії використання, включаючи потенційну взаємодію зі сторонніми API, обмеження кількості одночасних потоків виконання та обробку помилок.
Застосування loop.run_in_executor() разом з ThreadPoolExecutor дозволяє ефективно розпаралелити виконання синхронного коду та отримати значний приріст у швидкості виконання.
12 коментарів
Додати коментар Підписатись на коментаріВідписатись від коментарів