Покращуємо використання кешу для частих високонавантажених запитів

Привіт! Я Сергій, вже 5 років працюю зі Scala. Зараз на позиції тімліда середньої за розміром бекенд-команди у компанії VeliTech. Ми часто стикаємося із задачами обробки великої кількості запитів, жорсткими вимогами до часу відповіді на них та складною бізнес-логікою.

У цій статті пропоную розглянути як була вирішена проблема tail latency amplification для однієї з наших бізнес-задач — акумулювання балів гравців на кожен окремий запит з різними параметрами.

Матеріал буде корисний тим, хто розробляє високонавантажені бекенд-сервіси з вимогами стабільних та низьких затримок відповідей.

У чому проблема

Якщо ваша серверна програма обробляє запити, що створюють значне навантаження на саму програму, базу даних або навіть сторонні сервіси — це може викликати труднощі. Зокрема, після закінчення терміну дії кеша для певних запитів суттєво зростає затримка.

Що вищий ваш RPS (кількість запитів на секунду), то більше одночасних запитів буде спричиняти небажане навантаження та додаткові затримки. Ці затримки виникають через накопичення одночасно виконуваних обчислень. Якщо такі обчислення є критично трудомісткими та суттєво впливають на ресурси вашої системи, затримка може стрімко зростати, доки перший запит не завершиться і обчислене значення не буде додане до кешу.

Випадок VeliTech

Ми зіткнулися з подібною проблемою в одному з наших мікросервісів, що обробляє бали досягнень у різних іграх. Цей мікросервіс надає API-ендпоінт для агрегації балів за будь-який період часу. Такий процес вимагає значних ресурсів бази даних через необхідність робити агрегацію великої кількості історичних даних, join-нити декілька таблиць та вираховувати кінцевий результат. За бізнес-логікою це схоже на побудову лідерборду, але такого, що будується відповідно до параметрів запиту.

Для оптимізації ми використовуємо Redis для кешування результатів і маємо можливість масштабувати сервіс. Проте виконання агрегаційного запиту під навантаженням займає в середньому 1–2 секунди, а при високих навантаженнях цей час значно зростає (аж до 30 секунд).

Наслідки цієї проблеми ілюструє скріншот з логів ELK, знятий з нашого гейтвею:

Порівняно з кількістю запитів не так вже й погано:

Але варто покращити ситуацію, щоб не змушувати деяких користувачів чекати.

Рішення

Basic

Для вирішення проблеми нам потрібно робити тільки одне обчислення для усіх нових «важких» запитів, коли немає запису в кеші (наприклад, після завершення TTL).

Перше рішення, яке спадає на думку — застосувати concurrent map, в якій ключем буде той самий ключ, що і для кешування, а значенням — Promise того обчислення, яке буде запущенно вперше. Якщо значення є — беремо його і очікуємо на результат, якщо немає — запускаємо обчислення і також очікуємо. Спрощений приклад коду на Scala:

case class CalculationResult(someExpensiveCalculationValue: Long)

trait Cache {
 def get(key: String): Future[Option[CalculationResult]]


 def set(key: String, value: CalculationResult): Future[Unit]
}


val cache: Cache = ??? // todo: implement


def keyFromUri(uri: URI): String = ??? // todo: implement


val locks = new java.util.concurrent.ConcurrentHashMap[String, Promise[CalculationResult]]()


def expensiveCalculation() = Future {
 Thread.sleep(42000)
 CalculationResult(42)
}


def endpointLogic(URI: URI) = {
 cache.get(keyFromUri(URI)).flatMap {
   case None              =>
     val promise = Promise[CalculationResult]()
     val previousComputation = Option(locks.putIfAbsent(keyFromUri(URI), promise)).map(_.future)
     previousComputation.getOrElse {
       for {
         result <- expensiveCalculation()
         _ = promise.success(result)
         _ <- cache.set(keyFromUri(URI), result)
         _ = locks.remove(keyFromUri(URI))
       } yield result
     }
   case Some(cachedValue) =>
     Future.successful(cachedValue)
 }
}
}

Intermediate

Начебто на цьому можна етапі можна сказати, що проблема вирішена, але спробуємо ускладнити завдання. Що, якщо наш сервіс буде горизонтально масштабуватися? Кожна окрема репліка буде створювати окрему калькуляцію для нашого випадку. Якщо реплік буде багато, то ми знову ризикуємо отримати періодичне небажане навантаження.

Спробуємо розв’язати і цю проблему. Найпростішим рішенням (оминаючи ускладнення типу distributed state поміж репліками) буде використання стороннього сервісу, який триматиме в собі стан обчислення. Не обовʼязково, але бажано розв’язати проблему конкурентного доступу, аби точно мати тільки одне обчислення для одного ключа. Це можна зробити різними шляхами, в залежності від можливостей стороннього сервісу (наприклад, наявності або відсутності атомарних операцій). Так як ми вже потребуємо рішення для кешування, то можна використати той же Redis. По-перше, ми типово вирішуємо проблему кешування для реплікованих сервісів, по-друге — зʼявляється інструментарій для реалізації нашої задумки — дедублікації виконання важких, часто запитуваних обчислень.

Отож, як буде виглядати рішення на основі Redis-а в цьому випадку? Спершу треба визначити механізм синхронізації конкурентних запитів. Для цієї задачі існує дуже простий та нативний інструмент — операція set з флажком NX (if not exist). В офіційній документації є окремий розділ про реалізацію різних типів lock-ів, рекомендую його прочитати для розуміння цього підходу. В нашому випадку підійде проста реалізація на базі єдиного екземпляру Redis.

Можна скористатись бібліотеками, в яких вже реалізований цей підхід, наприклад, Radisson для Java. На жаль, відповідної реалізації для існуючих Scala-бібліотек я не знайшов.

Далі спробуємо замінити concurrentHashMap на redisClient:

val client = org.redisson.Redisson.create() //init with appropriate config
val lock: RLock = client.getLock("myLock")


def endpointLogic(URI: URI) = {
 cache.get(keyFromUri(URI)).flatMap {
   case None =>
     lock.lockAsync(10, TimeUnit.SECONDS).toScala.flatMap { _ =>
       cache.get(keyFromUri(URI)).flatMap { //recheck cache after acquiring lock
         case None =>
           (for {
             result <- expensiveCalculation()
             _ <- cache.set(keyFromUri(URI), result)
           } yield result).andThen {
             case _ => lock.unlockAsync()
           }
         case Some(cachedValue) =>
           lock.unlockAsync().toScala.map(_ => cachedValue)
       }
     }
   case Some(cachedValue) =>
     Future.successful(cachedValue)
 }
}

В результаті маємо наступну логіку: якщо у кеші немає значення за ключем, намагаємось отримати lock, перевіряємо, що за час очікування блокування не зʼявилось нового значення в кеші, і тільки після цього починаємо виконувати наші обчислення. Таким чином ми гарантуємо дедублікацію всіх запитів на виконання обчислення, незважаючи на горизонтальне масштабування.

Advanced

Можна зупинитись на попередньому варіанті реалізації, якщо вас влаштовують періодичні підвищення часу очікування запиту, хоч вже і без зайвого навантаження на систему. Але спробуємо вирішити цю проблему. Давайте припустимо, що одноманітні запити приходять більш-менш стабільно в рамках якогось невеликого проміжку часу (у рамках TTL вашого кеша). В таких випадках можна покращити час очікування за допомогою попереднього завантаження даних в кеш.

Нам потрібно буде визначити часове вікно, в рамках якого ми з високою вірогідністю встигнемо провести наші обчислення. Та це вікно не буде перевищувати TTL кеша і буде значно менше за нього, хоча б в половину. Суть цього рішення полягає в тому, щоб порівняти час, який залишився до кінця життя кешу (created at + TTL) та визначене часове вікно для попереднього завантаження.

Якщо часу до кінця життя запису в кеші менше, ніж є це вікно — запускати обчислення в фоні програми. Для цього потрібно буде розширити значення, яке зберігаємо в кеші, додавши до нього time-поле, куди будемо писати час запису в кеш. Додамо сюди попередні напрацювання з використанням lock-ів й отримуємо наступне рішення:

case class CachedValue(time: LocalDateTime, value: CalculationResult)


val ttlSec = 120
val preLoadIntervalSec = 20


private def ttlLeftFrom(time: LocalDateTime): Long = {
 val passedSince = Duration.between(time, LocalDateTime.now).getSeconds
 (ttlSec - passedSince).max(0)
}


def endpointLogic(URI: URI) = {
 cache.get(keyFromUri(URI)).flatMap {
   case None              =>
     lock.lockAsync(10, TimeUnit.SECONDS).toScala.flatMap { _ =>
       cache.get(keyFromUri(URI)).flatMap { //recheck cache after acquiring lock
         case None              =>
           (for {
             result <- expensiveCalculation()
             _ <- cache.set(keyFromUri(URI), CachedValue(LocalDateTime.now, result))
           } yield result).andThen {
             case _ => lock.unlockAsync()
           }
         case Some(cachedValue) =>
           lock.unlockAsync().toScala.map(_ => cachedValue)
       }
     }
   case Some(cachedValue) =>
     val ttlLeft = ttlLeftFrom(cachedValue.time)
     val isPreLoadTime = ttlLeft <= preLoadIntervalSec
     if (isPreLoadTime) {
       (for {
         _ <- lock.lockAsync(10, TimeUnit.SECONDS).toScala
         result <- expensiveCalculation()
         _ <- cache.set(keyFromUri(URI), CachedValue(LocalDateTime.now, result))
       } yield result).andThen {
         case _ => lock.unlockAsync()
       }
     }
     Future.successful(cachedValue)
 }
}

В більшості випадків воно дозволяє не відчувати збільшення часу очікування запиту під час експірації кеша та забезпечує стабільний час відгуку системи.

За необхідності можна використати більш продвинуті імплементації lock-ів, як-от Spin Lock та Fenced Lock. Нижче наведена діаграма послідовностей, яка описує фінальну реалізацію.

Висновок

Проблему tail latency amplification можна вирішити доволі ефективно та відповідно до специфіки роботи системи. В простішому випадку нам буде достатньо вбудованих у мову механізмів. В найскладнішому випадку знадобиться допомога сторонніх сервісів, але вже тривіальних для подібного роду систем. Описане рішення дає змогу позбутися 99% tail latency випадків.

Якщо знайдете якісь недоліки або корнер-кейси цього рішення, пишіть про них в коментарях. З радістю подивлюсь, відповім і прийму челлендж на впровадження покращень.

Підписуйтеся на Telegram-канал «DOU #tech», щоб не пропустити нові технічні статті

👍ПодобаєтьсяСподобалось11
До обраногоВ обраному5
LinkedIn
Дозволені теги: blockquote, a, pre, code, ul, ol, li, b, i, del.
Ctrl + Enter
Дозволені теги: blockquote, a, pre, code, ul, ol, li, b, i, del.
Ctrl + Enter

Цікаво яка структура БД і запити? Я так зрозумів що перший запит завжди довго виконується. Може краще спробувати вирішити проблему оптимізувавши БД та запити? Наприклад створити якісь проміжні таблиці для таких запитів. Чи робити попередню вибірку даних...

В данному випадку запит є ресурсоємним, але максимально оптимізований і виконується за 500-1500 мс, в залежності від вибірки. Проблема в тому, що багато паралельних таких запитів починають створювати проблему. Банально можуть забиватися сесії з пула або вичерпуватися ресурси, виділені під саму СКБД.
Оптимізація того запиту в базу могла би бути дискусійною, але в даному кейсі фокус статті направлений на те, як діяти, якщо маємо саме такі обмеження.

Чому горизонтальне масштабування репліками, а не шардінгом?
Кожен шард обслуговує свої ключі, відповідно, його кеш має тримати в рази менше користувачів, а значить — може їх дані тримати в рази довше. І не треба синхронізації локів кеша між репліками — бо однакові ключі завжди падають на один шард.

Ну це вже коли багато реплік\даних, коли 2-4 репліки бо один інстанс не вивозить запити, то достатньо дубової реплікі та раунд робін і не заморочуватись із розбалансуванням данних\запитів між шардами та агрегацією данних між ними, бо не завжди все так чьотенько живе в своєму щарді. А головне що шардінги не дають авілабіліті гарне та партішен толеранс, а отже треба робити повноцінній consistent hashing де в тебе і репліки і шардінг... Вобщем то вже треба петрати щоб по нормальному дизайнити таке :-)

А якщо там ботлнек в витягуванні даних(база), а не в обробці(аплікейшен), то може взагалі потрібно репліку на базу зробити? Знову без особливого гемору.

Вобщем там багато питань і так мало відповідей :-)

Саме так, але за ще одну репліку бази доведеться платити. Хоча проблема виникає тільки «в моменті» і стабільного навантаження, яке б виправдало реплікацію бази немає. Це просто оптимізація, яка покращує час відповіді для більшості запитів, наскільки це можливо.
До речі, шардування, яке пропонується вище — це цілком валідне вирішення цією проблеми, але як на мене складніше та дорожче по часу розробки. Воно буде виправдано, якщо у вас є інші причини його додати.

Не зрозумів чому репліка сервісу є проблемою. Усі репліки по ідеї генерують ті самі агрегації, редіс підтримує реплікацію також і там навіть локи є. Тобто реплікований редіс та універсальний ключ до данних мають вирішити проблему.

Реплікація сервісу не є проблемою. Просто її можна врахувати та не робити унікальне обчислення на кожній репліці окремо, а робити тільки на одній. Проста дедублікація обчислень. Це не буде помітно на 2-3 репліках, але на десятках чи сотнях точно відчуватиметься.

Підписатись на коментарі