Швидко та недорого покращуємо перфоманс невеликого Python-проєкту
Привіт! Мене звати Олег Качур, я — Python-розробник у компанії Yalantis. Протягом своєї кар’єри я працював над різноманітними проєктами, від малих до середніх, як внутрішніх сервісів, досліджень і розробок, так і комерційних продуктів.
Сьогодні я хотів би поділитися цікавим досвідом з практики — рефакторингом невеликого проєкту з мінімальним залученням додаткових ресурсів. Також я розгляну переваги та недоліки альтернативних шляхів розв’язання схожих проблем.
Опис проблеми
Певний час тому до моїх рук потрапив невеликий DRF-проєкт (Django Rest Framework) — сервіс, що генерує репорти на основі даних з 3rd-party-сервісів, що значно зменшує рутину та зберігає час великому відділу компанії на більш творчі та цікаві задачі.
Під час функціонального та acceptance-тестування отримали репорти із зауваженнями про довгий час генерації репорту за період часу 6+ місяців.
Оскільки до сервісу долучалися нові й нові користувачі з новими патернами використання, вирішили переглянути NFR та пошукати можливість прискорення генерації репортів, але з мінімальними зусиллями від розробницької команди/відсутності інфраструктурних змін.
Отже, під час ресерчу переглянув ядро генерації репортів, де побачив картину на кшталт:
def generate_report(): report_data = {} report_data['section_1'] = prepare_section_1() report_data['section_2'] = prepare_section_2() report_data['section_3'] = prepare_section_3() report_data['section_4'] = prepare_section_4() report_data['custom_section'] = prepare_custom_section() return report_data
Примітка: тут функція наведена без аргументів, оскільки вони не мають відношення до нашого топіку (там присутні start/end date
та параметри генерації).
Також варто зазначити, що запити виконуються за допомогою готового клієнту, що надається вендором.
Логічна побудова типової стандартного репорту:
def prepare_section_N() query_1 = build_third_party_query_1() data_1 = execute_query(query_1) process_and_save_data_1(data_1) … query_N = build_third_party_query_N() data_N = execute_query(query_N) process_and_save_data_N(data_N)
Як бачимо, кожна секція типово складається з побудови запиту на third-party, власне запиту, обробки результатів та збереження в базу, і так декілька разів. В реальному коді присутня додаткова логіка, щодо того, які саме запити будувати залежно від інпут-параметрів.
Тобто під час генерації відбувається купа типових послідовних циклів запитів на third-party з їхньою подальшою обробкою та зберіганням. Коли збільшилась кількість даних та користувачі почали обирати більші періоди генерації, лаг став помітним.
За синтетичними даними на тестовому датасеті, час генерації окремих репортів досягав
Очевидно, що подібні сценарії з’явилися після певного часу після запуску сервісу, користувачі та навантаження збільшувались поступово, тож наявний код працював коректно.
Отже, за результатами ресерчу визначились, що ботлнек полягає у головній функції побудови репорту, що послідовно робить значну кількість послідовних запитів на third-party API.
Спробуємо розглянути можливі шляхи вирішення, не забуваючи, що обмежені в ресурсах годин розробницької та інфраструктурної команд.
Розвʼязання проблеми
😅 Перш ніж переписати все на Rust, я вирішив, що варто розглянути різні способи розв’язання цієї досить типової проблеми.
Чи правильно ми користуємось API?
Перш ніж використати нову меджик-технологію, в загальному випадку, буде доречно переглянути алгоритм роботи з third-party API. Поставити собі питання:
- Які саме дані потрібно отримати?
- Чи не отримуємо зайві дані?
- Чи не має запитів, що дублюються та які повертають однакову інформацію, але викликаються декілька разів?
- Можливо деякі записи можна отримувати батчами, а не по одному?
- Чи можна поєднати щось в один запит та скоротити загальну кількість запитів?
В нашому випадку, через специфічні бізнес-вимоги, не вдалось скоротити кількість запитів, але я всім рекомендую починати думати в напрямку оптимізації, саме з логіки роботи з third-party API.
Варіант 1. Celery або аналогічне рішення
Додати celery та огорнути послідовні запити як таски — напевно перше, що прийшло мені на думку.
Переваг у такого підходу дійсно чимало:
- інтеграція з Django;
- багато зручних примітивів у canvas;
- вбудований rate-limiter;
- багатий тулінг та база знань/рішень.
Є і певні недоліки, особливо в контексті нашої задачі.
Якщо обгортати наявний код тасками, то це буде дуже далеко від best practices. До коду тасок є певні вимоги (мають бути короткими, атомарними та ідемпотентними), тобто, щоб їх використати, треба добряче порефакторити наш код.
Також є питання щодо серіалізації даних, додаткові інфраструктурні кости, подальша підтримка та моніторинг.
Загалом це популярне та робоче рішення, але для нашого випадку це оверкіл, та й не підходить під обмеження наведені в першому розділі.
Варіант 2. Синхронізація/реплікація бази
Іноді взагалі можна відмовитись від використання API та працювати з синхронізованою копією або реплікою бази, будувати ефективні запити до власної бази, без використання API. Оскільки дані тепер під рукою, це відкриває широкі можливості по гнучкості запитів та їхньої оптимізації.
З обмежень — це певний лаг, щодо актуальних даних (залежить від деталей реалізації), та звісно, питання доступу до самої бази: мало сервісів готові подібний доступ надати, або потрібно якимось чином підтягувати дані з third-party-бази.
З власного досвіду мушу зазначити, що це д-у-у-же екзотичний або дуже костильний варіант, але все одно вирішив про нього згадати.
В нашому випадку не було єдиної бази third-party-сервісів, а також ми не маємо доступу до бази напряму, тому таке рішення одразу відкинули.
Варіант 3. Використати asyncio
Починаючи з версії python 3.4, asyncio є частиною стандартної бібліотеки.
Документація нам підказує «asyncio is a library to write concurrent code using the async/await syntax» та «asyncio is often a perfect fit for IO-bound and high-level structured network code».
Оскільки наше дослідження показало, що затримка саме IO-bound, не блокувати потік виконання в очікуванні на довгі запити — непоганий варіант, що точно дасть приріст по швидкості в частині роботи з великою кількістю «важких» запитів.
Враховуючи ці фактори, виглядає дуже перспективно використати саме цю бібліотеку.
До недоліків можна віднести відсутність вбудованого rate-limiter (але є asyncio.Semaphore та інші примітиви синхронізації), у багатьох бібліотеках немає async-версій для простої інтеграції.
Варіант 4. Thread pool executor
Старий добрий thread pool зі стандартної бібліотеки Python (з очевидної IO-bound-природи задачі, не розглядали ProcessPoolExecutor).
Ще один варіант асинхронності — нам надається високорівневе API для запуску тасок в пулі тредів.
Що також дозволяє конкурентно робити запити, та вигравати час на «важких» запитах.
Є певні вимоги щодо тасок, окрім загальних вимог, як для celery-тасок, вони мають бути thread safe, потрібно заздалегідь подумати про обробку потенційних помилок.
Також перспективний кандидат для нашої задачі.
З наведених варіантів вирішили обирати між третім та четвертим, оскільки вони потенційно здатні зменшити загальний час на отримання відповіді від стороннього API, та відповідно, пришвидшити генерацію репорту.
Вирішили швидко порівняти на синтетичних тестах:
Обрали 20 урлів з різних доменів та додали до більшості з них затримку в 1000 мс, для декількох — різні значення від 500 до 3000 мс, це хоч і трохи перебільшено, але відповідає картині запитів від нашого сервісу на стороннє API.
Затримку додавали за допомогою сервісу requestly.io/<delay_ms>/<target_url>.
Для запитів використаємо бібліотеку httpx, адже вона має синхронний та асинхронний інтерфейс.
Спершу запустимо у синхронному форматі, щодо
import httpx def run_sync(links): for url in links: httpx.get(url, follow_redirects=True)
отримали результат 41.3 s ± 554 ms per loop.
Це умовно відповідає поточній картині з 20+ важких запитів запущених послідовно.
Для asyncio:
import asyncio import httpx async def fetch_url_async(url): async with httpx.AsyncClient() as client: return await client.get(url, follow_redirects=True) async def fetch_urls(links): tasks = [fetch_url_async(url) for url in links] await asyncio.gather(*tasks) def run_concurrent_async(links): loop = asyncio.get_event_loop() loop.run_until_complete(fetch_urls(links))
Отримали 3.81 s ± 75 ms per loop (mean ± std. dev. of 7 runs, 10 loops each). Стало трохи краще.
Та з ThreadPoolExecutor:
import httpx from concurrent.futures import ThreadPoolExecutor, as_completed def fetch_url(url): with httpx.Client() as client: response = client.get(url) return response def run_with_threadpool(links, max_workers=None): with ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_url = {executor.submit(fetch_url, url): url for url in links} for future in as_completed(future_to_url): url = future_to_url[future] try: result = future.result() except Exception as e: print(f"Error fetching {url}: {str(e)}")
Тут окремої уваги заслуговує параметр max_workers
, якщо залишити його None
, то підставиться min(32, os.cpu_count() + 4)
.
Для тесту, запустимо з дефолтним налаштуванням та з max_workers=25
(щоб гарантовано одночасно мати по одному потоку на один запит).
Результати:
3.89 s ± 89.5 ms per loop (mean ± std. dev. of 7 runs, 10 loops each) з дефолтною кількістю воркерів,
та 3.76 s ± 144 ms per loop (mean ± std. dev. of 7 runs, 10 loops each).
Загалом маємо таку картину:
Результат дещо очевидний, конкурентний запуск в ~10 разів швидший, між варіантами конкурентності різниця мінімальна та не може бути вирішальним фактором.
Ми зупинилися на ThreadPool-варіанті, адже мали thread-safe-клієнт від вендора, та не мали асинхронного.
Щодо загального випадку: вибір за вами та вашими уподобаннями, думаю, швидше ви зустрінетесь з обмеженнями рейт-лімітів, аніж відчуєте різницю у сценаріях, схожих з нашим.
Можемо рекомендувати бібліотеку backoff для встановлення лімітів, ретраїв та роботи з помилками.
Безпосередньо рефакторинг
Отже, визначились з підходом, можемо переходити безпосередньо до рефакторингу.
Поглянемо знову на функцію prepare_section_N
:
def prepare_section_N(): query_N = build_third_party_query_N() data_N = execute_query(query_N) process_and_save_data_N(data_N)
Найбільший час забирає саме запит на API та очікування відповіді, тож саме цю частину треба готувати до запуску в ThreadPool.
По генерації квері для запиту build_third_party_query_N()
стало зрозуміло, що всі необхідні параметри вже є на початку генерації репорту, тобто є можливість побудувати всі квері одразу для всіх секцій. У обробці результатів теж змінено підхід: тепер будемо спочатку отримувати всі дані, а потім обробляти та зберігати.
Який це має вигляд в результаті
Робимо build_queries()
та отримуємо dict
з ключами по секції та стрінговими квері як значеннями:
queries_by_section = { 'section_1_query_1': 'query string for section 1 query 1', 'section_1_query_2': 'query string for section 2 query 2', 'section_2_query_1': 'query string for section 2 query 1', … 'custom_query_1': 'query string for custom query 1', }
Далі власне маппінг та сабміт тасок:
import concurrent.futures from typing import Any def fetch_query_results(api_client: Any, query_by_section: dict) -> dict: results = {} with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: # Submit tasks to the executor future_to_query_str = { executor.submit( api_client.run_query, query_str, ): section_query_key for section_query_key, query_str in query_by_section.items() } # Process the results as they become available for future in concurrent.futures.as_completed(future_to_query_str): section_query_key = future_to_query_str[future] query_result = {"result": None, "error": {}, "query_str": query_by_section.get(section_query_key)} try: query_result["result"] = future.result() except api_client.exceptions.ClientException as error: query_result["error"] = {"error": error} except Exception as error: query_result["error"] = {"error": error} finally: results[section_query_key] = query_result return results
На виході отримуємо схожу структуру з результатами:
results_by_section = { 'section_1_query_1': {"result": result_data1, "error": {}, "query_str": query_str_1, 'section_1_query_1': {"result": result_data2, "error": {}, "query_str": query_str_2, … 'custom_query_1': {"result": None, "error": {"error": "error text"}, "query_str": custom_query_str_1}, }
Далі викликаємо process_results(results_by_section)
, що є фінальним кроком нашої обробки.
Висновки
Отже, за допомогою рефакторингу та стандартної бібліотеки вдалося пришвидшити роботу зі сторонніми API-сервісами до 10 разів, та в три-чотири рази зменшити загальний час генерації репорту, без залучення додаткових ресурсів та витрат на підтримку у майбутньому.
Наступним ботлнеком може стати робота з базою даних та рендеринг звітів. Можливо, у майбутньому ми повернемося до цієї теми, і це стане предметом окремої статті.
З корисних тулзів можливо не так широко відомих на загал хочу відзначити вищезгадані бібліотеки httpx та backoff. В пригоді був сервіс requestly.io. Також чудово себе зарекомендував підхід зберігання JSON-дати з вихідними кверями на сторонній сервіс та відповідями на них. Це дуже допомагало під час розробки та рефакторингу. В результаті у нас зберігаються побудовані квері на сторонній сервіс, відповіді від сервісу та обробленні дані з яких вже будується репорт.
Схожий підхід, тільки з логуванням, згадується в статті мого колеги Павла (розділ 14 «Вхідні та вихідні запити до third-party»).
У цьому дописі я хотів поділитися власним досвідом розв’язання нашої та, як мені здається, досить поширеної проблеми, розглянути альтернативні підходи з їхніми перевагами та недоліками та нагадати про старі-добрі речі зі стандартної бібліотеки Python.
Звичайно, я не пропоную це рішення як універсальне та найкраще для всіх задач. Проте хочу показати, що іноді незначними зусиллями та використовуючи стандартну бібліотеку, можна вирішити завдання. Про такі можливості слід пам’ятати та взяти до уваги.
Сподіваюся, матеріал був корисний для вас, і ви не сильно засмутилися, що ми не обрали варіант на Rust. Чекаю на ваші думки та коментарі.
7 коментарів
Додати коментар Підписатись на коментаріВідписатись від коментарів