Виконуємо синхронний код в асинхронному середовищі

Підписуйтеся на Telegram-канал «DOU #tech», щоб не пропустити нові технічні статті

Привіт, мене звуть Марк, я Back-end Engineer в компанії Welltech і сьогодні ми поговоримо про те, як одночасно запустити багато функцій. І сфокусуємось ми на одночасному виконанні синхронного IO bound коду.

Хочу нагадати, що майже 10 років тому в Python 3.4 було додано модуль asyncio авторства розробника з України Андрія Свєтлова.

Почнемо з простого та канонічного прикладу того, як в сучасному Python асинхронно виконати декілька IO bound функцій:


import asyncio

async def io_bound_task(arg):
    await asyncio.sleep(3)  # емулюємо IO 
    return f"Result for {arg}"

async def main():
    tasks = [io_bound_task(arg) for arg in range(10)]
    return await asyncio.gather(*tasks)
    
asyncio.run(main())

Такий підхід гарно підходить, якщо ви запускаєте асинхронний код. Але часто так буває, що цільовий код, який ви хочете виконати умовно паралельно, не є асинхронним.

Поширеним прикладом такої ситуації є використання бібліотек-клієнтів якогось API. Ось простий приклад:


def collect_account_data(acc_id):
    # do some request preparation or data extraction here
    return client.get_account(acc_id)  # sync IO bound code

def main(accounts_ids):
   return [collect_account_data(acc_id) for acc_id in accounts_ids]

Наш код працює, але працює синхронно і тому повільно, і ми хочемо, щоб функція collect_account_data виконувалась умовно паралельно для декількох акаунтів.

Раніше така задача вирішувалась за допомогою модуля threading. Але в сучасному Python є альтернатива — це використання asyncio.to_thread() та loop.run_in_executor() в asyncio.

Тож перепишімо функцію main так, щоб collect_account_data виконувалась для декількох акаунтів одночасно:


import asyncio

def main(accounts_ids):
    async def collect_data_async():
        tasks = [asyncio.to_thread(collect_account_data, acc_id) for acc_id in accounts_ids]
        return await asyncio.gather(*tasks)
    
    return asyncio.run(collect_data_async())

В цьому прикладі asyncio.run виконує основну роботу. Вона створює новий event loop, виконує передану корутину і потім закриває event loop.

Тепер розгляньмо 2 модифікації нашої задачі. Перша варіація — в нашому third party API реалізовано тротлінг, і під час виконання нашого коду ми отримуємо помилку на кшталт «Too Many Requests».

Щоб вирішити таку проблему, ми можемо явно створити ThreadPoolExecutor і вказати максимальну кількість одночасних потоків.

Тепер наш код буде виглядати так:


from concurrent.futures import ThreadPoolExecutor

def main(accounts_ids):
    max_concurency = 5  # adjust the value due to rate limit policy

    loop = asyncio.get_running_loop()

    functions_to_run = []
    with ThreadPoolExecutor(max_workers=max_concurency) as executor:
        for acc_id in accounts_ids:
            functions_to_run.append(
                loop.run_in_executor(executor, collect_account_data, acc_id)
            )
    results = loop.run_until_complete(asyncio.gather(*functions_to_run))
    loop.close()
    return results

Розберімося, що тут відбувається. Функція loop.run_in_executor запускає collect_account_data в окремому потоці. Функція loop.run_in_executor першим параметром приймає інстанс executor, і якщо він не був переданий, то неявно створюється executor дефолтного класу, тобто ThreadPoolExecutor.

Примітка: у цьому прикладі ми самі створюємо event loop і це означає що нам потрібно його закрити після виконання задач. Це бажано робити для коректного завершення роботи програми та для звільнення ресурсів, які використовуються в задачах, таких як файли, мережеві з’єднання тощо. В наступних прикладах я пропускаю закриття event loop для лаконічності прикладів.

В цьому оновленому прикладі наш код виконує не більше 5 одночасних запитів до API. Але що робити, якщо перед нами стоїть протилежна задача: тротлінгу немає, у нас сотні чи тисячі ID в масиві accounts_ids і нам потрібно прискорити виконання даного коду.

Що ж, немає нічого простішого ніж збільшити max_workers, наприклад до декількох сотень і наш код запрацює в декілька разів швидше. Річ у тім, що за замовчуванням ThreadPoolExecutor створює кількість воркерів (потоків), що дорівнює кількості CPU + 4.

Наступний варіант ускладнення нашої задачі — це необхідність додати таймаут до виконання кожної функції в потоці. Для максимальної простоти прикладів ми відійдемо від використання third party API.


import asyncio
import time

def sync_function(arg):
    time.sleep(random.choice(range(1, 10)))
    return f"Result for {arg}"

async def run_sync_function_with_timeout(arg):
    timeout_seconds = 2
    try:
        return await asyncio.wait_for(asyncio.to_thread(sync_function, arg), timeout_seconds)
    except asyncio.TimeoutError:
        return None

def main():
    loop = asyncio.get_running_loop()

    tasks = [run_sync_function_with_timeout(arg) for arg in range(10)]
    results = loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True))
    return [res for res in results if res is not None]

В цьому прикладі функція main поверне результати виконання sync_function для всіх функцій що встигли закінчити роботу до таймауту. Тут варто зазначти, що після того, як ми отримуємо результат функції main, функції які ще не завершили виконання продовжують свою роботу.

Наступний варіант задачі — загальний таймаут, який обмежує час виконання функції main.


def sync_function(arg):
    time.sleep(random.choice(range(1, 10)))
    return f"Result for {arg}"

def main(timeout=3):
    loop = asyncio.get_running_loop()

    tasks = [asyncio.to_thread(sync_function, arg) for arg in range(10)]

    try:
        loop.run_until_complete(asyncio.wait_for(asyncio.gather(*tasks, return_exceptions=True), timeout))
    except asyncio.TimeoutError:
        pass

Недоліком такого рішення є те, що функція main не повертає результати виконання sync_function.

Виправляємо це у наступному прикладі:


def main(timeout=3):
    loop = asyncio.get_running_loop()
    tasks = [loop.run_in_executor(None, sync_function, i) for i in range(10)]
    loop.run_until_complete(asyncio.wait(tasks, timeout=timeout)
    return [task.result() for task in tasks if task.done()]

Ще раз нагадаю, що у всіх наведених прикладах з таймаутом ті функції, що «не вклались» в таймаут продовжують працювати після таймауту. Але інколи нам треба зупинити роботи всіх функцій одразу після настання таймауту.

Перше, що спадає на думку для вирішення такою ситуації — зупинити всі потоки з незакінченими задачами. Але це погана ідея. Реалізувати таку задачу не так просто, тому що:

  1. Python не надає інструменти для її вирішення out of the box;
  2. примусове переривання потоку може порушити консистентність даних та призвести до resource leaking та deadlocks.

Тому якщо вам дійсно потрібно швидко зупиняти виконання задач, то для цього доведеться використовувати модуль multiprocessing.

Все ж таки повернемось до asyncio. Якщо ваша цільова функція робить декілька ітерацій з IO викликами, то можна додати логіку припинення виконання функції всередину цільової функції, використовуючи глобальний флаг.


continue_execution = True

def sync_function(arg):
    global continue_execution

    for i in range(10):
        if not continue_execution:
            break
        time.sleep(arg * 0.1)
    print(f"Finished task for {arg}")
    return f"Result for {arg}"

def main():
    global continue_execution

    loop = asyncio.get_running_loop()

    tasks = [loop.run_in_executor(None, sync_function, i) for i in range(20)]

    done, pending = loop.run_until_complete(asyncio.wait(tasks, timeout=3))

    if pending:
        continue_execution = False

    # this allows to cancel tasks which were not passed to thread yet
    for task in pending:
        task.cancel()

    return [task.result() for task in done]

Обробка помилок

Тепер трохи поговоримо про обробку помилок. При використанні ThreadPoolExecutor перша помилка всередині треду зупинить виконання всього коду. Однакова поведінка буде як при явному використанні ThreadPoolExecutor, так і при неявному через loop.run_in_executor або asyncio.to_thread.

Треба зазначити, що при помилці зупинка коду відбувається не миттєво, а після закінчення роботи пулу, в якому була помилка. Скоріше за все, це не те, що вам потрібно і ви очікуєте, що помилка в одній з функцій не зупинить виконання інших функцій.

На мою думку, найпростіший спосіб вирішити таку задачу — це огорнути весь код цільової функції в try/except блок або використовувати окрему функцію обгортку. Найпростіший варіант виглядатиме так:


def sync_function(arg):
    try:
        time.sleep(1)
        if random.random() > 0.5:
            1/0
        return arg * arg
    except Exception:
        return None
 
def main():
    async def async_function():
        tasks = [asyncio.to_thread(sync_function, arg) for arg in range(10)]
        return await asyncio.gather(*tasks)
    
    return [r for r in asyncio.run(async_function()) if r is not None]

Або такий більш гнучкий варіант:


def sync_function(arg):
    time.sleep(1)
    if random.random() > 0.5:
        1/0
    return arg * arg

def sync_function_with_error_handling(arg):
    try:
        return {"result": sync_function(arg), "error": None}
    except Exception as err:
        # do some extra error handling here
        return {"result": None, "error": err}
 
def main():
    async def async_function():
        tasks = [asyncio.to_thread(sync_function_with_error_handling, arg) for arg in range(10)]
        return await asyncio.gather(*tasks)
    
    results = asyncio.run(async_function())
    return [r['result'] for r in results if r.get('result')]

Звичайно, якщо ваша функція-обгортка повторюється для різних функцій, то буде розумно замінити її декоратором.

У цій статті ми розглянули одночасне виконання синхронного IO-bound коду в асинхронному середовищі Python за допомогою модуля asyncio. Показані приклади демонструють різні сценарії використання, включаючи потенційну взаємодію зі сторонніми API, обмеження кількості одночасних потоків виконання та обробку помилок.

Застосування loop.run_in_executor() разом з ThreadPoolExecutor дозволяє ефективно розпаралелити виконання синхронного коду та отримати значний приріст у швидкості виконання.

👍ПодобаєтьсяСподобалось8
До обраногоВ обраному6
LinkedIn
Дозволені теги: blockquote, a, pre, code, ul, ol, li, b, i, del.
Ctrl + Enter
Дозволені теги: blockquote, a, pre, code, ul, ol, li, b, i, del.
Ctrl + Enter

Простіше відправити повідомлення в чергу (RabbitMQ, Nats, Kafka), ніж займатись подібним у Python та PHP.

Або ж використовуйте мови, які мають багатопоточність, прикладу Go, Rust, Java, Scala та Elixir.

Простіше вийде чи ні, в даному випадку, дуже залежить від контексту.

Можете описати, які переваги даного підходу перед використанням звичайних тредів?

Насправді різниця не дуже велика. Коли ми явно чи не явно створюємо ThreadPoolExecutor, то він створює саме інстанси threading.Thread як свої воркери. Різниця в коді сводиться до того, що код в більшості випадків виходить більш лаконіним, так як не треба менеджити треди явно. Також з відміностей те, як обробляються помилки: при використані asyncio.to_thread помилки одразу пробрасуются «на гору». Правда це не завжди бажана поведінка.

Якщо ж подивитись більш глобально, то для мене головний плюс це можливість явно не змішувати використання asyncio/threading в одному проєкті чи модулі. Так як asyncio і threading зазвичай розглядаються як «конкуренти», то явне одночасне використання обох цих модулів виглядає як безлад. Тобто якщо десь вже використовується asyncio, але треба умовно розпаралелити синхронний код, то використання саме asyncio.to_thread виглядає більш логічним. Напевно це суб’єктивно, але все ж.

Методи класні, єдине що у мене були проблеми при огортанні запитів до баз даних, там треба явно створювати чергу, бо чомусь pymysql входив в race condition і дані почали спотворюватись.
Ідея була, що треба постійно писати дані в MySQL і я намагався огорнути окремі виклики на запис саме так, але виявилось, що все ж краще через синхронний доступ або створювати чергу з запитів.

У мене кілька питань, але я почну з першого:
А нахіба тут взагалі потрібно «У наступних прикладах ми часто будемо явно створювати event loop»?

Інколи є current event loop, інколи немає. Для того щоб код працював в обох цих випадках створюємо новий.

Шедевральна відповідь, дякую....
Мало того, що замість використання high-level api, в статті використовується low-level, що тільки ускладнює розуміння коду, так воно ще й з помилкою

На мою особисту думку всю цю статтю можно замінити двома лінками на документацию (docs.python.org/...​sk.html#asyncio.to_thread, docs.python.org/...​ncio.loop.run_in_executor), і не ускладнювати людям розуміння тим, що їм навіть не потрібно для виконання задачі, що вказана в назві цієї статті.

Що ж, думаю Ви праві і явне створення все ж таки зайве в прикладах. Оновлю їх.

Я тепер не розумію про що стаття....

import asyncio

def main(accounts_ids):
async def collect_data_async():
tasks = [asyncio.to_thread(collect_account_data, acc_id) for acc_id in accounts_ids]
return await asyncio.gather(*tasks)

return asyncio.run(collect_data_async())

Нащо в синхронному коді, огортати синхронний код в асинхронну бібліотеку, що б використати treading?

Напишіть авторам пітону, хай видаляють to_thread.

Я сподівався що зрозуміло що стаття орієнтована на тих, в кого ще немає навички читати документацію, або чий рівень англійської недостатній щоб робити це ефективно.

Підписатись на коментарі