Покращуємо використання кешу для частих високонавантажених запитів
Привіт! Я Сергій, вже 5 років працюю зі Scala. Зараз на позиції тімліда середньої за розміром бекенд-команди у компанії VeliTech. Ми часто стикаємося із задачами обробки великої кількості запитів, жорсткими вимогами до часу відповіді на них та складною бізнес-логікою.
У цій статті пропоную розглянути як була вирішена проблема tail latency amplification для однієї з наших бізнес-задач — акумулювання балів гравців на кожен окремий запит з різними параметрами.
Матеріал буде корисний тим, хто розробляє високонавантажені бекенд-сервіси з вимогами стабільних та низьких затримок відповідей.
У чому проблема
Якщо ваша серверна програма обробляє запити, що створюють значне навантаження на саму програму, базу даних або навіть сторонні сервіси — це може викликати труднощі. Зокрема, після закінчення терміну дії кеша для певних запитів суттєво зростає затримка.
Що вищий ваш RPS (кількість запитів на секунду), то більше одночасних запитів буде спричиняти небажане навантаження та додаткові затримки. Ці затримки виникають через накопичення одночасно виконуваних обчислень. Якщо такі обчислення є критично трудомісткими та суттєво впливають на ресурси вашої системи, затримка може стрімко зростати, доки перший запит не завершиться і обчислене значення не буде додане до кешу.
Випадок VeliTech
Ми зіткнулися з подібною проблемою в одному з наших мікросервісів, що обробляє бали досягнень у різних іграх. Цей мікросервіс надає API-ендпоінт для агрегації балів за будь-який період часу. Такий процес вимагає значних ресурсів бази даних через необхідність робити агрегацію великої кількості історичних даних, join-нити декілька таблиць та вираховувати кінцевий результат. За бізнес-логікою це схоже на побудову лідерборду, але такого, що будується відповідно до параметрів запиту.
Для оптимізації ми використовуємо Redis для кешування результатів і маємо можливість масштабувати сервіс. Проте виконання агрегаційного запиту під навантаженням займає в середньому
Наслідки цієї проблеми ілюструє скріншот з логів ELK, знятий з нашого гейтвею:
Порівняно з кількістю запитів не так вже й погано:
Але варто покращити ситуацію, щоб не змушувати деяких користувачів чекати.
Рішення
Basic
Для вирішення проблеми нам потрібно робити тільки одне обчислення для усіх нових «важких» запитів, коли немає запису в кеші (наприклад, після завершення TTL).
Перше рішення, яке спадає на думку — застосувати concurrent map, в якій ключем буде той самий ключ, що і для кешування, а значенням — Promise того обчислення, яке буде запущенно вперше. Якщо значення є — беремо його і очікуємо на результат, якщо немає — запускаємо обчислення і також очікуємо. Спрощений приклад коду на Scala:
case class CalculationResult(someExpensiveCalculationValue: Long) trait Cache { def get(key: String): Future[Option[CalculationResult]] def set(key: String, value: CalculationResult): Future[Unit] } val cache: Cache = ??? // todo: implement def keyFromUri(uri: URI): String = ??? // todo: implement val locks = new java.util.concurrent.ConcurrentHashMap[String, Promise[CalculationResult]]() def expensiveCalculation() = Future { Thread.sleep(42000) CalculationResult(42) } def endpointLogic(URI: URI) = { cache.get(keyFromUri(URI)).flatMap { case None => val promise = Promise[CalculationResult]() val previousComputation = Option(locks.putIfAbsent(keyFromUri(URI), promise)).map(_.future) previousComputation.getOrElse { for { result <- expensiveCalculation() _ = promise.success(result) _ <- cache.set(keyFromUri(URI), result) _ = locks.remove(keyFromUri(URI)) } yield result } case Some(cachedValue) => Future.successful(cachedValue) } } }
Intermediate
Начебто на цьому можна етапі можна сказати, що проблема вирішена, але спробуємо ускладнити завдання. Що, якщо наш сервіс буде горизонтально масштабуватися? Кожна окрема репліка буде створювати окрему калькуляцію для нашого випадку. Якщо реплік буде багато, то ми знову ризикуємо отримати періодичне небажане навантаження.
Спробуємо розв’язати і цю проблему. Найпростішим рішенням (оминаючи ускладнення типу distributed state поміж репліками) буде використання стороннього сервісу, який триматиме в собі стан обчислення. Не обовʼязково, але бажано розв’язати проблему конкурентного доступу, аби точно мати тільки одне обчислення для одного ключа. Це можна зробити різними шляхами, в залежності від можливостей стороннього сервісу (наприклад, наявності або відсутності атомарних операцій). Так як ми вже потребуємо рішення для кешування, то можна використати той же Redis. По-перше, ми типово вирішуємо проблему кешування для реплікованих сервісів, по-друге — зʼявляється інструментарій для реалізації нашої задумки — дедублікації виконання важких, часто запитуваних обчислень.
Отож, як буде виглядати рішення на основі Redis-а в цьому випадку? Спершу треба визначити механізм синхронізації конкурентних запитів. Для цієї задачі існує дуже простий та нативний інструмент — операція set з флажком NX (if not exist). В офіційній документації є окремий розділ про реалізацію різних типів lock-ів, рекомендую його прочитати для розуміння цього підходу. В нашому випадку підійде проста реалізація на базі єдиного екземпляру Redis.
Можна скористатись бібліотеками, в яких вже реалізований цей підхід, наприклад, Radisson для Java. На жаль, відповідної реалізації для існуючих Scala-бібліотек я не знайшов.
Далі спробуємо замінити concurrentHashMap на redisClient:
val client = org.redisson.Redisson.create() //init with appropriate config val lock: RLock = client.getLock("myLock") def endpointLogic(URI: URI) = { cache.get(keyFromUri(URI)).flatMap { case None => lock.lockAsync(10, TimeUnit.SECONDS).toScala.flatMap { _ => cache.get(keyFromUri(URI)).flatMap { //recheck cache after acquiring lock case None => (for { result <- expensiveCalculation() _ <- cache.set(keyFromUri(URI), result) } yield result).andThen { case _ => lock.unlockAsync() } case Some(cachedValue) => lock.unlockAsync().toScala.map(_ => cachedValue) } } case Some(cachedValue) => Future.successful(cachedValue) } }
В результаті маємо наступну логіку: якщо у кеші немає значення за ключем, намагаємось отримати lock, перевіряємо, що за час очікування блокування не зʼявилось нового значення в кеші, і тільки після цього починаємо виконувати наші обчислення. Таким чином ми гарантуємо дедублікацію всіх запитів на виконання обчислення, незважаючи на горизонтальне масштабування.
Advanced
Можна зупинитись на попередньому варіанті реалізації, якщо вас влаштовують періодичні підвищення часу очікування запиту, хоч вже і без зайвого навантаження на систему. Але спробуємо вирішити цю проблему. Давайте припустимо, що одноманітні запити приходять більш-менш стабільно в рамках якогось невеликого проміжку часу (у рамках TTL вашого кеша). В таких випадках можна покращити час очікування за допомогою попереднього завантаження даних в кеш.
Нам потрібно буде визначити часове вікно, в рамках якого ми з високою вірогідністю встигнемо провести наші обчислення. Та це вікно не буде перевищувати TTL кеша і буде значно менше за нього, хоча б в половину. Суть цього рішення полягає в тому, щоб порівняти час, який залишився до кінця життя кешу (created at + TTL) та визначене часове вікно для попереднього завантаження.
Якщо часу до кінця життя запису в кеші менше, ніж є це вікно — запускати обчислення в фоні програми. Для цього потрібно буде розширити значення, яке зберігаємо в кеші, додавши до нього time-поле, куди будемо писати час запису в кеш. Додамо сюди попередні напрацювання з використанням lock-ів й отримуємо наступне рішення:
case class CachedValue(time: LocalDateTime, value: CalculationResult) val ttlSec = 120 val preLoadIntervalSec = 20 private def ttlLeftFrom(time: LocalDateTime): Long = { val passedSince = Duration.between(time, LocalDateTime.now).getSeconds (ttlSec - passedSince).max(0) } def endpointLogic(URI: URI) = { cache.get(keyFromUri(URI)).flatMap { case None => lock.lockAsync(10, TimeUnit.SECONDS).toScala.flatMap { _ => cache.get(keyFromUri(URI)).flatMap { //recheck cache after acquiring lock case None => (for { result <- expensiveCalculation() _ <- cache.set(keyFromUri(URI), CachedValue(LocalDateTime.now, result)) } yield result).andThen { case _ => lock.unlockAsync() } case Some(cachedValue) => lock.unlockAsync().toScala.map(_ => cachedValue) } } case Some(cachedValue) => val ttlLeft = ttlLeftFrom(cachedValue.time) val isPreLoadTime = ttlLeft <= preLoadIntervalSec if (isPreLoadTime) { (for { _ <- lock.lockAsync(10, TimeUnit.SECONDS).toScala result <- expensiveCalculation() _ <- cache.set(keyFromUri(URI), CachedValue(LocalDateTime.now, result)) } yield result).andThen { case _ => lock.unlockAsync() } } Future.successful(cachedValue) } }
В більшості випадків воно дозволяє не відчувати збільшення часу очікування запиту під час експірації кеша та забезпечує стабільний час відгуку системи.
За необхідності можна використати більш продвинуті імплементації lock-ів, як-от Spin Lock та Fenced Lock. Нижче наведена діаграма послідовностей, яка описує фінальну реалізацію.
Висновок
Проблему tail latency amplification можна вирішити доволі ефективно та відповідно до специфіки роботи системи. В простішому випадку нам буде достатньо вбудованих у мову механізмів. В найскладнішому випадку знадобиться допомога сторонніх сервісів, але вже тривіальних для подібного роду систем. Описане рішення дає змогу позбутися 99% tail latency випадків.
Якщо знайдете якісь недоліки або корнер-кейси цього рішення, пишіть про них в коментарях. З радістю подивлюсь, відповім і прийму челлендж на впровадження покращень.
7 коментарів
Додати коментар Підписатись на коментаріВідписатись від коментарів