Сучасна диджитал-освіта для дітей — безоплатне заняття в GoITeens ×
Mazda CX 30
×

Как мы построили real-time аналитическую платформу, используя Kafka, OpenResty и ClickHouse

Привет, меня зовут Василий Макогонский. Больше половины своей жизни я занимаюсь программированием, и уже около 5 лет я работаю CTO в компании Futurra Group.

В этой статье хочу поделиться с вами тем, как у нас получается собирать и анализировать очень большое количество данных (сотни миллионов событий в сутки и больше) со всего мира, что в свою очередь позитивно влияет на развитие продуктов.

«Чтобы что-то улучшить, нужно сначала это измерить»

Эта цитата как нельзя лучше отражает наш итеративный подход к изменениям в продуктах. Перед началом всех изменений и тестов, мы фиксируем показатели, на которые будем ориентироваться для принятия дальнейших решений. Очень простые изменения могут повлечь за собой глобальные изменения в поведении пользователей, а это в свою очередь может привести к замедлению роста проекта. По этой причине мы собираем все события, которые происходят в приложении, чтобы понимать глубину влияния изменений. Миллионы пользователей каждый день используют наши продукты, и наша задача — сохранить каждое событие, которое произошло во время сессии.

Основная задача, которую мы ставили перед собой — создать решение, которое смогло бы максимально универсально, эффективно, и в то же время быстро принимать различные данные. От выручки до логов на бэкендах.

Для этого формализуем и выделим основные наши цели:

  1. Идемпотентность данных.
  2. Отказоустойчивость системы.
  3. Масштабируемость всех частей системы.
  4. Failover.
  5. Возможность останавливать части системы для их модернизации или профилактики.
  6. Сбор данных должен быть универсальным для всех платформ, проектов, задач.
  7. Иметь возможность отправлять зашифрованные данные.
  8. Удобная кастомизация метрик, удобство в добавлении разных разрезов к данным.
  9. Возможность принимать данные максимально эффективно из разных частей мира.
  10. Отображение данных в режиме Realtime (<2s).

Так как количество событий будет исчисляться сотнями миллионов в сутки, потенциально и миллиардами, необходимо использовать систему, которая будет иметь возможность буферизировать, сжимать, подготавливать данные к дальнейшему анализу.

В итоге мы собрали все необходимые инструменты:

  1. GEO-dns (Route53).
  2. OpenResty (Nginx+Lua).
  3. Kafka+Zookeeper.
  4. Consumer tasks (PHP).
  5. ClickHouse.
  6. Supervisor.
  7. Mysql для словарей, определения проектов и метрик.

Для начала нам нужна точка входа в систему — место, куда будем отправлять данные. Также необходимо понимать, что скорость подключения и отправки данных к одному серверу из разных стран будет заметно отличаться, учитывая расстояния и особенности построения мировой сети.

Выбор пал на сервис от Amazon Route53, он же GEO-dns в нашем случае. Есть много других подобных провайдеров. Но у нас уже был успешный опыт работы именно с Route53, который также имеет возможность сделать Failover на случай падения сервера. Это необходимый функционал в построении высоконагруженных систем.

Направляем наш домен на Route53. Для начала необходимо зарегистрироваться на AWS, если еще нет аккаунта. Далее добавляем наш домен в «Hosted zones», а на стороне регистратора меняем NS сервера, которые выдаст AWS в консоли при добавлении нового домена. Обычно это два NS сервера, но, бывает, для большей надежности выдают более четырех.

Следующим этапом будет настройка записей в DNS, чтобы при ответе мы получали наши адреса серверов. Используем записи «A». Если серверов несколько, как в нашем случае, делаем несколько записей. Если необходимо использовать под регион отдельный сервер/серверы, выбираем записи с использованием геолокации (Routing policy). Можно выставить дефолтный IP сервера для всех регионов и, например, несколько — для наиболее клиенто вместительных стран.

В данном случае это будут те серверы, которые у нас принимают HTTP запросы для сбора статистики, то есть первое звено.

Для настройки фейловера нам необходимо указать в Route53 проверку конкретных серверов, которые мы использовали в «A» записях. Для этого создаем в Health checks протокол проверки (в нашем случае HTTP, но если используем сертификаты, то выбираем HTTPS), периодичность и собственно IP наших серверов.

На данном этапе у нас уже есть готовый балансировщик, сервер под регион, фейловер (если в наличии не один, а несколько серверов).

Первым звеном в системе использовали OpenResty. Для чего? По сути это тот же любимый многими веб-сервер Nginx, но со встроенным модулем Lua, который нужен нам для буферизации данных и для оптимизации скорости. Если кратко, то данные приняли в систему максимально быстро и отправили на клиент код 200.

Алгоритм будет работать в следующем порядке. Принимаем кучу запросов по HTTP, собираем эти данные в пачки. К каждому запросу при необходимости (в нашем случае это — базовая информация о запросе, например IP клиента, время запроса в UTC, GEO идентификация по IP). Для ГЕО идентификации используем сервис Maxmind, у которого есть готовые библиотеки для NGINX.

Далее собранные нами данные архивируем посредством библиотек LUA, и отправляем данные на серверы с Kafka.

Все это в памяти, без использования медленных дисков.

Можно использовать не только OpenResty для этого, но и софт от FluentD и т.д. Кому как удобней. В нашем случае входящие данные принимаются по обычному HTTP.

Следующий этап — это очередь. В Kafka можно делить на партиции. Например, 10 партиций и тройной репликацией. Это позволит сохранить данные даже при отказе не одного, а многих серверов в кластере. Но минусом будет конечно же избыточность, так как данные будут сохранены в трех копиях.

Первый и самый главный этап закончен. Данные успешно были приняты в систему для последующей обработки.

Связка Kafka+Zookeeper получилась очень удачной, так как стабильность их работы в кластере показала себя более чем удовлетворительно.

Далее данные нужно обработать, разложить по проектам и метрикам таким образом, чтобы иметь удобную возможность принимать решения на их основе, либо закрывать другие задачи, мониторинг и т.д.

Почему выбор пал на ClickHouse как хранилище?

  • Это колоночная БД, а это значит, что каждая колонка в таблице представлена в файловой системе как отдельный файл. Это удобно, когда у нас нет определенный структуры данных, и в некотором будущем структура будет меняться. Например, когда нужно добавить новый разрез к уже существующей метрике.
  • Данные соединяются (merge) к старым на фоне, как следствие имеем возможность быстро данные отправлять и не ждать реальной вставки на диски.
  • Есть возможность сохранять свежие данные (последние) на SSD или NVM дисках, а уже более старые (например, недельной давности) отправлять на большие HDD диски. Это в свою очередь удешевляет цену сохранности данных. Этот процесс также отрабатывает на фоне.
  • Opensource проект, его постоянное развитие.
  • Куча аналитических встроенных функций.
  • Родной SQL с минимальными отличиями.
  • Возможность делать materialized views, когда нужно делать представление данных как результат работы запроса (select).
  • Масштабируемость. Distributed таблицы, те, которые по сути не имеют в себе данных. А при выборке из них делают распределение на всех шардах.

Данные для простоты организации архитектуры нужно представить в удобном виде. Вот, как это сделали мы. Наш виртуальный проект — это база данных, в которой каждая метрика — это таблица. В свою очередь таблицы имеют колонки, а это у нас разрезы метрик, таким образом мы можем трекать данные и иметь возможность эти данные дополнять различными параметрами или характеристиками.

Еще один важный момент. Все данные мы не будет нормализовать, все сохраняем так, как получили. Потому что скорость выборки для нас более важна на сегодня, чем сам размер баз данных. Стоит и учитывать, что ClickHouse сам сжимает данные после объединения со старыми записями.

Да, за основу обработки данных мы взяли конечно же PHP, так как опыт работы с ним у нас достаточный для решения почти всех задач.

  • Стягиваем данные из Kafka пачками.
  • Достаем оригинал из архива.
  • Если данные были изначально зашифрованы, расшифровываем.
  • Определяем проект, принадлежность к конкретной метрике.
  • Группируем данные по проекту, метрике.
  • Создаем БД в Clickhouse, если такой еще нет.
  • Если были добавлены новые поля к метрикам, добавляем к таблице колонку (делаем alter table).
  • И отправляем уже обработанные, сгруппированные данные в Clickhouse хранилище.

Таких обработчиков необходимо запустить столько, сколько партиций в очереди Kafka. Так как каждая партиция обрабатывается своей таской.

На сегодня мы собираем и анализируем более 800 миллионов запросов в сутки со всех континентов на достаточно небольших серверных мощностях. Наши аналитики, продуктовые менеджеры, а также коллеги, ответственные за направления, имеют возможность качественно собирать, агрегировать и анализировать данные. При необходимости всегда добавляем новые разрезы по данным, проводим тесты и двигаем продукты вперед.

Собирайте данные, и чем больше — тем лучше! Надеюсь, было полезно. Всем успехов!

👍ПодобаєтьсяСподобалось9
До обраногоВ обраному11
LinkedIn
Дозволені теги: blockquote, a, pre, code, ul, ol, li, b, i, del.
Ctrl + Enter
Дозволені теги: blockquote, a, pre, code, ul, ol, li, b, i, del.
Ctrl + Enter

Не могли бы вы подсказать, как именно у вас реализовано вот это?

Есть возможность сохранять свежие данные (последние) на SSD или NVM дисках, а уже более старые (например, недельной давности) отправлять на большие HDD диски.

А еще, не было ли проблем с

Если были добавлены новые поля к метрикам, добавляем к таблице колонку (делаем alter table).

?
Возможно в вашем случае без этого никак.. но представляю когда с таким кол-вом данных, вдруг не проходит альтер и падает с ошибкой

підкажіть будь ласка скільки ресурсів на то пішло?
людей та часу

так метрики или логи?
или все вместе?

Идемпотентность данных

идемпотентность скорее характеристика системы, обработчика или операции чем данных.

Отображение данных в режиме Realtime (<2s).

Судя по тому что вы описали, у вас не realtime система/подход. И дело тут не в latency.

Цікава тема, що клікхауз сам вміє з кафки читати події. Ми так живемо, API на кложі по HTTP приймає події (одну або пачку), пише в кафку, клікхауз читає. ~100 млн в день і навіть не приходиться думати про цю частину системи.

Вибачте, що ворошу старе. Але цікаво, як у вас проходить процес міграції? Я ж так розумію, що є якась схема у таблиці та у топіка кафки, і це залежні речі.

Ну як, додали необов‘язкове поле у клікхауз міграцією, почали писати у кафку. Чи навпаки, насправді, трохи підзабув, чи клікхауз помирає, якщо необов‘язкового поля у повідомленні у кафці нема.

Поля не видаляємо і не перейменовуємо. Є окремі два поля, масиви ключів/значень, які дозволяють в події скласти довільні дані, тому міграцію ми робили аж один раз. :)

Цікавий підхід з зберіганням кожної метрики в окремій таблиці. На скільки зручно працювати з купою таблиць? І взагалі було б цікаво подивитись на приклад структури.

в нас зараз 13643 метрик, тобто таблиць, в біля 800-ста базах даних. насправді зручно, тому що кожний собі створює проект якийсь і починає трекати якісь дані. ясна річ аналітики не з усіма ними працюють) є дані, які трекаються і про них забуваються. буває так, що постало питання і потрібні дані. а ось ці дані давно уже трекаються)

в нас зараз 13643 метрик

всего или типов метрик?
если — первое, то чего так мало? или это какие-то чисто бизнес/application-метрики?

Дякую за статтю
Як швидко PHP справляється з обробкою? Розглядали якісь альтернативи, типу pandas/dask/spark з python?

А почему не и пользуете Kafka Engine + MatView?

Навіщо так ускладнювати?

Пишете сервіс на Go, який через gRPC отримує дані та пачками вставляє в ClickHouse. Ось приклад на Go з репозиторієм.

Ось вам і MaxMind DB Reader for Go.

Як перекваліфікуватись з PHP на Go.

Так от замены nginx+openresty на go сильно проще не станет, если конечно сохранность данных важна.

Всё равно лучше разнести процессы получения данных и записи в какой-то быстрый сторедж. А обработкой (может быть ещё какая-то логика или обращения third party сервисам) и записью в долговременное хранилище отдельно заниматься.

Если можно часть данных потерять (ну там упали сервисы с частично накопленными данными), то можно конечно всё и упростить, без кафки и в одном сервисе сделать получение/сохранение.

Згоден з автором, кафка посередині краще, ніж її відсутність. З клікхаузом можуть бути питання зі схемою, з міграціями тощо, а події не хочеться втрачати. Опенресті як апі виглядає норм, воно досить швидке, щоб не думати за нього.

Прочитайте після цього моменту уважніше:

Да, за основу обработки данных мы взяли конечно же PHP

Коли ClickHouse читає з Kafka то надійно, як у вас, але у автора статті Kafka ➡️ PHP ➡️ ClickHouse.

Ну мені теж не здається, що пхп підходить для того, але це нормально все одно, імхо. Це ж вже десь всередині сховано, можна не перейматися. У сенсі якщо їм вистачає швидкості пхп, то переписування на го нічого ж не дасть.

так для гоферов не важна прагматичность/целесообразность, им надо чтобы все было на го, так как го самый лучший, потому что го самый лучший

Go — найкраща мова програмування за версією самої мови програмування Go

Підписатись на коментарі