Швидко та недорого покращуємо перфоманс невеликого Python-проєкту

Привіт! Мене звати Олег Качур, я — Python-розробник у компанії Yalantis. Протягом своєї кар’єри я працював над різноманітними проєктами, від малих до середніх, як внутрішніх сервісів, досліджень і розробок, так і комерційних продуктів.

Сьогодні я хотів би поділитися цікавим досвідом з практики — рефакторингом невеликого проєкту з мінімальним залученням додаткових ресурсів. Також я розгляну переваги та недоліки альтернативних шляхів розв’язання схожих проблем.

Опис проблеми

Певний час тому до моїх рук потрапив невеликий DRF-проєкт (Django Rest Framework) — сервіс, що генерує репорти на основі даних з 3rd-party-сервісів, що значно зменшує рутину та зберігає час великому відділу компанії на більш творчі та цікаві задачі.

Під час функціонального та acceptance-тестування отримали репорти із зауваженнями про довгий час генерації репорту за період часу 6+ місяців.

Оскільки до сервісу долучалися нові й нові користувачі з новими патернами використання, вирішили переглянути NFR та пошукати можливість прискорення генерації репортів, але з мінімальними зусиллями від розробницької команди/відсутності інфраструктурних змін.

Отже, під час ресерчу переглянув ядро генерації репортів, де побачив картину на кшталт:

def generate_report():
    report_data = {}
    report_data['section_1'] = prepare_section_1()
    report_data['section_2'] = prepare_section_2()
    report_data['section_3'] = prepare_section_3()
    report_data['section_4'] = prepare_section_4()
    report_data['custom_section'] = prepare_custom_section()
    return report_data

Примітка: тут функція наведена без аргументів, оскільки вони не мають відношення до нашого топіку (там присутні start/end date та параметри генерації).

Також варто зазначити, що запити виконуються за допомогою готового клієнту, що надається вендором.

Логічна побудова типової стандартного репорту:

def prepare_section_N()
    query_1 = build_third_party_query_1()
    data_1 = execute_query(query_1)
    process_and_save_data_1(data_1)
    …
    query_N = build_third_party_query_N()
    data_N = execute_query(query_N)
    process_and_save_data_N(data_N)

Як бачимо, кожна секція типово складається з побудови запиту на third-party, власне запиту, обробки результатів та збереження в базу, і так декілька разів. В реальному коді присутня додаткова логіка, щодо того, які саме запити будувати залежно від інпут-параметрів.

Тобто під час генерації відбувається купа типових послідовних циклів запитів на third-party з їхньою подальшою обробкою та зберіганням. Коли збільшилась кількість даних та користувачі почали обирати більші періоди генерації, лаг став помітним.

За синтетичними даними на тестовому датасеті, час генерації окремих репортів досягав 25-30 секунд.

Очевидно, що подібні сценарії з’явилися після певного часу після запуску сервісу, користувачі та навантаження збільшувались поступово, тож наявний код працював коректно.

Отже, за результатами ресерчу визначились, що ботлнек полягає у головній функції побудови репорту, що послідовно робить значну кількість послідовних запитів на third-party API.

Спробуємо розглянути можливі шляхи вирішення, не забуваючи, що обмежені в ресурсах годин розробницької та інфраструктурної команд.

Розвʼязання проблеми

😅 Перш ніж переписати все на Rust, я вирішив, що варто розглянути різні способи розв’язання цієї досить типової проблеми.

Чи правильно ми користуємось API?

Перш ніж використати нову меджик-технологію, в загальному випадку, буде доречно переглянути алгоритм роботи з third-party API. Поставити собі питання:

  1. Які саме дані потрібно отримати?
  2. Чи не отримуємо зайві дані?
  3. Чи не має запитів, що дублюються та які повертають однакову інформацію, але викликаються декілька разів?
  4. Можливо деякі записи можна отримувати батчами, а не по одному?
  5. Чи можна поєднати щось в один запит та скоротити загальну кількість запитів?

В нашому випадку, через специфічні бізнес-вимоги, не вдалось скоротити кількість запитів, але я всім рекомендую починати думати в напрямку оптимізації, саме з логіки роботи з third-party API.

Варіант 1. Celery або аналогічне рішення

Додати celery та огорнути послідовні запити як таски — напевно перше, що прийшло мені на думку.

Переваг у такого підходу дійсно чимало:

  • інтеграція з Django;
  • багато зручних примітивів у canvas;
  • вбудований rate-limiter;
  • багатий тулінг та база знань/рішень.

Є і певні недоліки, особливо в контексті нашої задачі.

Якщо обгортати наявний код тасками, то це буде дуже далеко від best practices. До коду тасок є певні вимоги (мають бути короткими, атомарними та ідемпотентними), тобто, щоб їх використати, треба добряче порефакторити наш код.

Також є питання щодо серіалізації даних, додаткові інфраструктурні кости, подальша підтримка та моніторинг.

Загалом це популярне та робоче рішення, але для нашого випадку це оверкіл, та й не підходить під обмеження наведені в першому розділі.

Варіант 2. Синхронізація/реплікація бази

Іноді взагалі можна відмовитись від використання API та працювати з синхронізованою копією або реплікою бази, будувати ефективні запити до власної бази, без використання API. Оскільки дані тепер під рукою, це відкриває широкі можливості по гнучкості запитів та їхньої оптимізації.

З обмежень — це певний лаг, щодо актуальних даних (залежить від деталей реалізації), та звісно, питання доступу до самої бази: мало сервісів готові подібний доступ надати, або потрібно якимось чином підтягувати дані з third-party-бази.

З власного досвіду мушу зазначити, що це д-у-у-же екзотичний або дуже костильний варіант, але все одно вирішив про нього згадати.

В нашому випадку не було єдиної бази third-party-сервісів, а також ми не маємо доступу до бази напряму, тому таке рішення одразу відкинули.

Варіант 3. Використати asyncio

Починаючи з версії python 3.4, asyncio є частиною стандартної бібліотеки.

Документація нам підказує «asyncio is a library to write concurrent code using the async/await syntax» та «asyncio is often a perfect fit for IO-bound and high-level structured network code».

Оскільки наше дослідження показало, що затримка саме IO-bound, не блокувати потік виконання в очікуванні на довгі запити — непоганий варіант, що точно дасть приріст по швидкості в частині роботи з великою кількістю «важких» запитів.

Враховуючи ці фактори, виглядає дуже перспективно використати саме цю бібліотеку.

До недоліків можна віднести відсутність вбудованого rate-limiter (але є asyncio.Semaphore та інші примітиви синхронізації), у багатьох бібліотеках немає async-версій для простої інтеграції.

Варіант 4. Thread pool executor

Старий добрий thread pool зі стандартної бібліотеки Python (з очевидної IO-bound-природи задачі, не розглядали ProcessPoolExecutor).

Ще один варіант асинхронності — нам надається високорівневе API для запуску тасок в пулі тредів.

Що також дозволяє конкурентно робити запити, та вигравати час на «важких» запитах.

Є певні вимоги щодо тасок, окрім загальних вимог, як для celery-тасок, вони мають бути thread safe, потрібно заздалегідь подумати про обробку потенційних помилок.

Також перспективний кандидат для нашої задачі.

З наведених варіантів вирішили обирати між третім та четвертим, оскільки вони потенційно здатні зменшити загальний час на отримання відповіді від стороннього API, та відповідно, пришвидшити генерацію репорту.

Вирішили швидко порівняти на синтетичних тестах:

Обрали 20 урлів з різних доменів та додали до більшості з них затримку в 1000 мс, для декількох — різні значення від 500 до 3000 мс, це хоч і трохи перебільшено, але відповідає картині запитів від нашого сервісу на стороннє API.

Затримку додавали за допомогою сервісу requestly.io/<delay_ms>/<target_url>.

Для запитів використаємо бібліотеку httpx, адже вона має синхронний та асинхронний інтерфейс.

Спершу запустимо у синхронному форматі, щодо

import httpx

def run_sync(links):
   for url in links:
       httpx.get(url, follow_redirects=True)

отримали результат 41.3 s ± 554 ms per loop.

Це умовно відповідає поточній картині з 20+ важких запитів запущених послідовно.

Для asyncio:

import asyncio
import httpx

async def fetch_url_async(url):
    async with httpx.AsyncClient() as client:
        return await client.get(url, follow_redirects=True)


async def fetch_urls(links):
    tasks = [fetch_url_async(url) for url in links]
    await asyncio.gather(*tasks)


def run_concurrent_async(links):
    loop = asyncio.get_event_loop()
    loop.run_until_complete(fetch_urls(links))

Отримали 3.81 s ± 75 ms per loop (mean ± std. dev. of 7 runs, 10 loops each). Стало трохи краще.

Та з ThreadPoolExecutor:

import httpx

from concurrent.futures import ThreadPoolExecutor, as_completed


def fetch_url(url):
   with httpx.Client() as client:
       response = client.get(url)
       return response


def run_with_threadpool(links, max_workers=None):
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_url = {executor.submit(fetch_url, url): url for url in links}
        for future in as_completed(future_to_url):
            url = future_to_url[future]
            try:
                result = future.result()
            except Exception as e:
                print(f"Error fetching {url}: {str(e)}")

Тут окремої уваги заслуговує параметр max_workers, якщо залишити його None, то підставиться min(32, os.cpu_count() + 4).

Для тесту, запустимо з дефолтним налаштуванням та з max_workers=25 (щоб гарантовано одночасно мати по одному потоку на один запит).

Результати:

3.89 s ± 89.5 ms per loop (mean ± std. dev. of 7 runs, 10 loops each) з дефолтною кількістю воркерів,

та 3.76 s ± 144 ms per loop (mean ± std. dev. of 7 runs, 10 loops each).

Загалом маємо таку картину:

Результат дещо очевидний, конкурентний запуск в ~10 разів швидший, між варіантами конкурентності різниця мінімальна та не може бути вирішальним фактором.

Ми зупинилися на ThreadPool-варіанті, адже мали thread-safe-клієнт від вендора, та не мали асинхронного.

Щодо загального випадку: вибір за вами та вашими уподобаннями, думаю, швидше ви зустрінетесь з обмеженнями рейт-лімітів, аніж відчуєте різницю у сценаріях, схожих з нашим.

Можемо рекомендувати бібліотеку backoff для встановлення лімітів, ретраїв та роботи з помилками.

Безпосередньо рефакторинг

Отже, визначились з підходом, можемо переходити безпосередньо до рефакторингу.

Поглянемо знову на функцію prepare_section_N:

def prepare_section_N():
    query_N = build_third_party_query_N()
    data_N = execute_query(query_N)
    process_and_save_data_N(data_N)

Найбільший час забирає саме запит на API та очікування відповіді, тож саме цю частину треба готувати до запуску в ThreadPool.

По генерації квері для запиту build_third_party_query_N() стало зрозуміло, що всі необхідні параметри вже є на початку генерації репорту, тобто є можливість побудувати всі квері одразу для всіх секцій. У обробці результатів теж змінено підхід: тепер будемо спочатку отримувати всі дані, а потім обробляти та зберігати.

Який це має вигляд в результаті

Робимо build_queries() та отримуємо dict з ключами по секції та стрінговими квері як значеннями:

queries_by_section = {
    'section_1_query_1': 'query string for section 1 query 1',
    'section_1_query_2': 'query string for section 2 query 2',
    'section_2_query_1': 'query string for section 2 query 1',
    …
    'custom_query_1': 'query string for custom query 1',
}

Далі власне маппінг та сабміт тасок:

import concurrent.futures

from typing import Any


def fetch_query_results(api_client: Any, query_by_section: dict) -> dict:
    results = {}
    with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
        # Submit tasks to the executor
        future_to_query_str = {
            executor.submit(
                api_client.run_query,
                query_str,
            ): section_query_key
            for section_query_key, query_str in query_by_section.items()
        }
        # Process the results as they become available
        for future in concurrent.futures.as_completed(future_to_query_str):
            section_query_key = future_to_query_str[future]
            query_result = {"result": None, "error": {}, "query_str": query_by_section.get(section_query_key)}
            try:
                query_result["result"] = future.result()
            except api_client.exceptions.ClientException as error:
                query_result["error"] = {"error": error}
            except Exception as error:
                query_result["error"] = {"error": error}
            finally:
                results[section_query_key] = query_result
    return results

На виході отримуємо схожу структуру з результатами:

 results_by_section = {
   'section_1_query_1': {"result": result_data1, "error": {}, "query_str": query_str_1,
   'section_1_query_1': {"result": result_data2, "error": {}, "query_str": query_str_2,
    …
    'custom_query_1': {"result": None, "error": {"error": "error text"}, "query_str": custom_query_str_1},
}

Далі викликаємо process_results(results_by_section), що є фінальним кроком нашої обробки.

Висновки

Отже, за допомогою рефакторингу та стандартної бібліотеки вдалося пришвидшити роботу зі сторонніми API-сервісами до 10 разів, та в три-чотири рази зменшити загальний час генерації репорту, без залучення додаткових ресурсів та витрат на підтримку у майбутньому.

Наступним ботлнеком може стати робота з базою даних та рендеринг звітів. Можливо, у майбутньому ми повернемося до цієї теми, і це стане предметом окремої статті.

З корисних тулзів можливо не так широко відомих на загал хочу відзначити вищезгадані бібліотеки httpx та backoff. В пригоді був сервіс requestly.io. Також чудово себе зарекомендував підхід зберігання JSON-дати з вихідними кверями на сторонній сервіс та відповідями на них. Це дуже допомагало під час розробки та рефакторингу. В результаті у нас зберігаються побудовані квері на сторонній сервіс, відповіді від сервісу та обробленні дані з яких вже будується репорт.

Схожий підхід, тільки з логуванням, згадується в статті мого колеги Павла (розділ 14 «Вхідні та вихідні запити до third-party»).

У цьому дописі я хотів поділитися власним досвідом розв’язання нашої та, як мені здається, досить поширеної проблеми, розглянути альтернативні підходи з їхніми перевагами та недоліками та нагадати про старі-добрі речі зі стандартної бібліотеки Python.

Звичайно, я не пропоную це рішення як універсальне та найкраще для всіх задач. Проте хочу показати, що іноді незначними зусиллями та використовуючи стандартну бібліотеку, можна вирішити завдання. Про такі можливості слід пам’ятати та взяти до уваги.

Сподіваюся, матеріал був корисний для вас, і ви не сильно засмутилися, що ми не обрали варіант на Rust. Чекаю на ваші думки та коментарі.

Підписуйтеся на Telegram-канал «DOU #tech», щоб не пропустити нові технічні статті

👍ПодобаєтьсяСподобалось6
До обраногоВ обраному2
LinkedIn
Дозволені теги: blockquote, a, pre, code, ul, ol, li, b, i, del.
Ctrl + Enter
Дозволені теги: blockquote, a, pre, code, ul, ol, li, b, i, del.
Ctrl + Enter

По-перше, рефакторинг це діяльність яка спрямована на зліпшення прозорості коду та полегшання його розуміння. Таким чином поліпшується прогнозованість часу розробки нових фіч. Те що йдеться в статті радше про performance tuning. Цей тюнінг не має жодного стосунку до рефакторинга а часто прямо йому суперечить.

По-друге, коли вирішується проблема тюнінга роботи зі сторонніми сервісами надважливим є обмеження на стороні АПІ. Тобто скільки запитів (за секунду/хвилину...) готове витримати АПІ. Ігнорування цього чинника може призвести до того що кінцеве рішення призведе до 429 помилки зі сторони серверів АПІ.

По-третє, результати тестів які ви отримали були саме такими тому що в вас 20 урлів і 25 воркерів. Тобто воркерів більше ніж урлів. Оскільки число користувачів сервіса постійно зростає рано чи пізно число урлів на продакшен серверах перевищить число воркерів і картина суттєво зміниться. В такому варіанті асинхронне рішення буде в рази більш ефективне. Щоб це побачити достатньо запустити ті ж самі тести про які йдеться в статті але на 3-4 воркерах.

На мою думку рішенням проблеми все ж таки був варіант з використанням asyncio та переписуванням/заміною/обходом клієнта АПІ.

Щоб asyncio дало перформанс, треба щоб сам фреймоврк був асинхронний. У автора під капотом DRF, яке працює синхронно. Тобто там піднято декілька потоків пайтона, які оброблюють реквести. Якщо всередині потока написати асинхронщину, яку заранити під asyncio.run(), то основний поток все одно буде чекати завершення івент-лупа, перш ніж зможе обробити наступний реквест. Таким чином, асинхронність чи багатопоточність в данному кейсі зможе тільки прискорити час одного реквесту. Для отримання повного перфомансу від асинхронності, треба перейти на asgi та якийсь FastAPI, який під час await зможе перейти до наступного реквесту.

<прискіпливий читач mode on>

у сценаріях, схожих с нашим.

С чи З нашим ?

По генерації квері запиту

Квері хіба то не є англіцизмом для запиту? Чи малось на увазі квері ДЛЯ запиту?
Взагалі якось забагато англіцизмів як на мене — усі ці квері, сабміти, ресерчі, тулзи та боттленеки мають чудові загальновживані аналоги українською

Спершу запустимо у синхронному формат, і щодо

Мабуть малось на увазі форматІ?

П. С.
Хтось взагалі вичитує матеріали перед публікацією?

По темі статті:
Чим міряли час виконання функцій? Судячи зі статті то timeit, але цікаво , може щось краще вже є... чи робили профайлінг коду?

Якщо використовували timeit — як вимірювали що саме в середині

def prepare_section_N():

займає найбільше часу? Там умовно виклики ще 3 допоміжних функцій, кожну з яких варто було б заміряти окремо

Стандартні трюки типу заміни for-loop на map не розглядали?

Взагалі цікаво трошки більше подробиць стосовно цільового звіту ( звичайно можете приховати конфіденційну інформацію) :
Скільки в середньому секцій в одному звіті/ скільки секцій в найбільшому чи найважчому звіті

Який розмір секції або яка суть секції? Я маю на увазі ще щось типу Select month, avg(sales) .... Group by 1
Тобто максимум 12 рядків даних
Або прям мільйон рядків найменшої гранулярності ?
Наскільки «широкі» ці секції- це 12 колонок або 200?

Чи однакові за змістом різні секції чи це абсолютно різні запити до різних таблиць?

Чи розглядали під час рефакторінгу варіант що одна з секцій буде ну прям гігантська- гігантська і поток впаде з ООМ?
Натяк що умовний Spark/ PySpark ці самі запити міг би так само зробити ще й з масштабуванням і RDD з коробки

Дякую за розлогий коментар, увагу до деталей та конструктивну критику.

Щодо помилок — виправив.

Квері хіба то не є англіцизмом для запиту? Чи малось на увазі квері ДЛЯ запиту?

Так, ідея в тому що є 3rd party клієнт зі своїм DSL, відбувається побудова запиту DSL (квері), а згодом клієнт виконує HTTP запит, щоб розділити ці поняття написав таким чином.

Взагалі якось забагато англіцизмів як на мене — усі ці квері, сабміти, ресерчі, тулзи та боттленеки мають чудові загальновживані аналоги українською

Погоджуюсь, автор має профдеформацію в термінології і думає в таких термінах, буду вдячним за вдалі приклади аналогів.

Хтось взагалі вичитує матеріали перед публікацією?

Так, мені допомогали колеги, та наскільки я знаю, з редакції DOU теж продивлялися.

Якщо використовували timeit — як вимірювали що саме в середині

Так використовували timeit по цільових функціях, профайлінг не робили оскільки очевидно, що проблеми з очікуванням I/O.
Вкладені функції не міряли, оскільки не ставили за мету поміряти все, а отримати уявлення про було/стало, також після рефакторінгу деякі вкладені функції зникли, відповідно немає з чим порівнювати. Такої ситуації, що одна-дві вкладені функції займає більшість часу на виконання — не передбачається.

Щодо цифр наведених в порівняльній таблиці — це результати синтетичних тестів на одному наборі даних.

Стандартні трюки типу заміни for-loop на map не розглядали?

Якщо я правильно зрозмів, то ні. Як би це нам допомогло пришвидшити 20+ HTTP запитів, кожен з яких потенційно може займати декілька секунд?

Взагалі цікаво трошки більше подробиць стосовно цільового звіту ( звичайно можете приховати конфіденційну інформацію) :
Скільки в середньому секцій в одному звіті/ скільки секцій в найбільшому чи найважчому звіті

Який розмір секції або яка суть секції? Я маю на увазі ще щось типу Select month, avg(sales) .... Group by 1
Тобто максимум 12 рядків даних
Або прям мільйон рядків найменшої гранулярності ?
Наскільки «широкі» ці секції- це 12 колонок або 200?

Чи однакові за змістом різні секції чи це абсолютно різні запити до різних таблиць?

Спробую надати більше деталей: цільовий звіт містить 4 секції, вони порівнянно рівнозначні, іноді можуть бути пусті, десь це обчисленні/агреговані данні, десь список до 10 колонок «ширини».
Загалом, в граничних випадках мова йшла про тисячі записів «сирих» даних з декількома десятками полів кожен.

Можливо мені не вдалося прозоро це показати в тексті статті — для звіту потрібно побудувати пачку DSL запитів та зібрати-обробити відповіді. Якщо репорт за досить великий проміжок часу — левову частку займає саме очікування відповіді від 3rd party, над цим і працювали, оскільки інший процесінг набагато швидший.

Чи розглядали під час рефакторінгу варіант що одна з секцій буде ну прям гігантська- гігантська і поток впаде з ООМ?
Натяк що умовний Spark/ PySpark ці самі запити міг би так само зробити ще й з масштабуванням і RDD з коробки

Це точно не наш випадок, тому не розглядали.
Думаю «big data» репорти трохи інша і нішева тема, і «мануальна» обробка там дає абсолютно неприйнятний час, тому кому треба обов’язково до них прийде =)

Це ще раз демонструє, що не важливо, який інструмент ви використовуєте — Python, Rust, С++. Головне вміти ним користуватися. Можна і на Rust написати супер не оптимізоване рішення, яке буде в рази гірше за Python.

query можна закешувати в локальній sqlite БД для швидкого доступу, якщо timestamp не змінився, брати звідти.
Було б непогано, якщо б data була стиснена архіватарами, хоча це залежить від 3rd-party ресурсів

Кешування — цікава ідея для загального випадку, десь може дуже доречною.
Для нас не дуже можна застосувати(

Підписатись на коментарі