Починаємо роботу з Apache Kafka. Частина I
Підписуйтеся на Telegram-канал «DOU #tech», щоб не пропустити нові технічні статті.
Всім привіт. Я Сергій Моренець, розробник, викладач, спікер та технічний письменник. Хочу поділитися з вами своїм досвідом роботи з такою цікавою технологією як Apache Kafka та розкрити ті теми, які з нею пов’язані.
Я вже розповідав про event-driven architecture в одній із попередніх статей, але там йшлося більше про теорію і менше стосувалося практики. У цій статті я хотів би розповісти про конкретні кроки, які Java розробник повинен зробити, щоб отримати повноцінну систему обміну повідомленнями на базі Kafka. Постійно стикаючись з цією темою, у мене набралося достатньо матеріалу для окремої статті.
Крім того, ми розглядаємо цей матеріал на деяких тренінгах. Тому в цій статті я наведу як переваги та особливості цієї технології, так і практичні приклади її використання. Щоб приклади вийшли реалістичнішими, ми візьмемо кілька різних платформ, щоб показати, як інтегрувати Kafka з кількома сучасними фреймворками. Оскільки це дуже велика тема і всю її неможливо розглянути в одній статті (тим більше, з практичними прикладами), то я вирішив розділити її на кілька частин. У цій частині більше йтиметься про особливості та конкурентів Apache Kafka.
Messaging
З чого починаються розмови про взаємодію мікросервісів? Швидше за все, з REST API, а точніше із RESTful web systems. Це один з найбільш популярних підходів для комунікації через свою простоту і легкість реалізації. Але у REST API є свої мінуси та недоліки:
- REST API спочатку створювався для надсилання команд та запитів (queries), але не подій (нотифікацій).
- Сервіс-відправник може надіслати запит (request) тільки одному одержувачу, якого він повинен знати заздалегідь, таким чином виникає сильна пов’язаність (tight coupling) між відправником і одержувачем.
- Використовуючи REST API, складно реалізувати гарантовану доставку повідомлення у разі недоступності чи завантаженості одержувача.
- Відправник повинен чекати закінчення обробки запиту, а якщо це призводить до надсилання нових запитів, то це ще більше збільшує загальне latency.
Як нам вийти з цього становища? Нам потрібен підхід, який би дозволив:
- Прибрати тісний зв’язок між одержувачем та відправником.
- Дозволить динамічно додавати нових одержувачів.
- Надати засіб для гарантованої та максимально швидкої відправки/отримання/зберігання нотифікацій.
По суті, нам потрібна реалізація такої структури даних як черга (для наших нотифікацій). Але не та черга (Queue), яка існує в JDK, а більш досконала, яка зберігає дані поза JVM на дисковій підсистемі, підтримує high-availability (failover) і масштабування. Такі технології (або платформи) існують давно і в залежності від своєї функціональності є:
- Messaging queuing service (StormMQ, Amazon SQS).
- Message-oriented middleware або message broker (RabbitMQ).
- Stream processing або event streaming (Apache Kafka).
Однак ці системи об’єднує одне — вони призначені для того, щоб пересилати повідомлення (messages) між відправником і одержувачем (чи споживачем). Тому ми не помилимося, якщо використовуватимемо для них такий загальний термін як messaging systems.
Такі системи мають досить складну архітектуру, використовують різні математичні алгоритми для обробки та відправлення даних, тому їх пишуть не з нуля, а використовуючи готові напрацювання або ті ж патерни.
Огляд messaging systems
Ця стаття присвячена Kafka, але перед тим, як зайнятися нею впритул, цікаво розглянути ті системи для messaging/event-streaming, які є її конкурентами або попередниками. Справа в тому, що Kafka досить складна в налаштуванні та управлінні і, звичайно, не є ідеальним рішенням на всі випадки життя. Тому огляд альтернативних систем дозволить зрозуміти, для яких випадків варто використовувати саме Kafka, а для яких — інші messaging systems.
Почнемо з тих систем, які з’явилися раніше:
- ZeroMQ
- ActiveMQ
- RabbitMQ
Давайте розберемо їх особливості більш детально, але перед цим поговоримо про уніфікацію та специфікації в системах обміну повідомлень. І справді, такі специфікації як JPA, JAX-RS, JSF або Bean Validation дозволяють нам легко змінювати провайдерів, використовуючи загальні анотації та API. У Java (Jakarta) EE є специфікація і для messaging — JMS (Java message service), яка була пізніше перейменована в Jakarta Messaging API.
JMS 1.0 з’явився в першій версії Java EE 1.0 в 1998 році і розвивався досить неквапливо, принаймні версія 1.1 була випущена тільки в 2012 році, в Java EE 7. Зараз поточною є Jakarta Messaging 3.0.1 (як частина Jakarta EE 9.1). Специфікація JMS визначає основні компоненти messaging system:
- JMS провайдер.
- JMS клієнт.
- JMS відправник/одержувач.
- JMS повідомлення.
Неспішний розвиток JMS призвів до того, що не всі messaging systems її підтримують (цілком або повністю), а ті, які підтримують, в основному це роблять для того, щоб дозволити legacy проєктам перейти на свою технологію. Отже, перейдемо до опису технологій.
ZeroMQ з’явився у 2007 році. Спочатку він був написаний на C++ під кодовою назвою libzmq, але потім був портований найпопулярнішими мовами програмування, починаючи від Java і закінчуючи Erlang. Головна особливість ZeroMQ у тому, що це не платформа, а бібліотека, і вона не потребує виділеного сервера (message broker). Відповідно, вона не може зберігати повідомлення (на диску), хоча у неї є внутрішня буферизація в пам’яті, і якщо ваш сервіс відправив повідомлення, а у нього немає передплатників, або вони недоступні, то повідомлення буде втрачено.
ZeroMQ працює на базі TCP/IP та сокетів і пропонує асинхронну доставку/отримання повідомлень, підтримку патернів request/reply та publish/subscribe. У той же час тут немає message channels, але є фільтри підписки (subscription filters), які можна налаштувати для прийому тільки потрібних повідомлень. Для обміну повідомленнями використовується внутрішній протокол ZMTP 3.1. ZeroMQ позиціонує себе як легковажна messaging library. Benchmarks показують, що вона забезпечує пропускну спроможність до 4.5 мільйона повідомлень/сек.
ActiveMQ — open-source проєкт, що з’явився ще в 2004 році і розробляється під егідою Apache Foundation з 2007 року. Це повноцінний message broker, який реалізує JMS 1.1. Цікаво, що крім основного проєкту Apache ActiveMQ, активно розвивається нова версія ActiveMQ Artermis, яка повинна в майбутньому замінити основний проект. Artemis з’явився у 2014 році після поглинання проєкту JBoss HornetQ та його вихідного коду. ActiveMQ включає сховище повідомлень, причому може зберігати дані на диску, або в базі даних за допомогою JDBC. Сам ActiveMQ написаний на Java, але дозволяє інтеграцію з безліччю клієнтів іншими мовами програмування. Таким чином, ви можете обмінюватися повідомленнями в кросплатформовій системі, тому що для спілкування використовуються універсальні протоколи AMQP, STOMP (надбудова над WebSocket) або OpenWire. Більше того, ви можете надсилати повідомлення за допомогою REST API. Для систем управління є підтримка кластеризації і реплікації (за допомогою Zookeeper).
RabbitMQ — ще потужніша платформа, створена в 2007 році, а в 2010 році перейшла під крило Spring Source (а потім VMware Tanzu). Написана досить екзотичною мовою — Erlang, але пропонує клієнти практично для всіх популярних мов програмування (а також інтеграцію з Spring Framework). RabbitMQ позиціонує себе як message broker, а точніше як Exchange Server, який є її ядром. До переваг RabbitMQ належить її розширюваність завдяки плагінам, а також підтримка сучасних протоколів AMQP, STOMP і MQTT. Вона також підтримує кластеризацію та роботу в хмарі (AWS, Google Cloud, Azure), як і інтеграцію з основними засобами автоматизації розгортання інфраструктури — Chef, Puppet, Kubernetes та Docker. Окрема увага приділяється моніторингу, який можна проводити із командного рядка або Web UI. Ще одна перевага RabbitMQ — правила маршрутизації, які дозволяють фільтрувати або перенаправляти повідомлення між чергами.
Ці три технології дуже давно використовуються в комерційних проектах і добре відомі. Але за останні пару років з’явилися ще 2 перспективні технології, які починають активно завойовувати ринок:
RedPanda — система, яка почала розроблятися компанією Vectorized у
Chronicle Queue написана на Java (хоча є і C++ версія) і почала розроблятися набагато раніше, в 2016 році. Вона підтримує RMI, патерн Publish/Subscribe і на відміну від Kafka є не брокером, а фреймворком, який ви можете використовувати у своїх додатках. Тобто ви запускаєте кілька JVM додаткiв на одному серверi, які через API працюють із повідомленнями на диску. Chronicle Queue використовує memory-mapped файли для роботи з повідомленнями, що дозволяє досягти максимальної швидкодії роботи. Як і у випадку з RedPanda, тут є комерційна Enterprise Edition, а також деякі круті фічі від Kafka — реплікація даних та шифрування.
Apache Kafka
Apache Kafka — це кажучи образно, пізня дитина у сім’ї. Він з’явився в той час, коли вже існували основні гравці на цьому ринку, але вирішив зайняти нову нішу — високопродуктивні платформи для потокової обробки повідомлень. Тому офіційно це не message broker, а шина для event streaming. Як ви бачите, тут замість повідомлення використовується синонім подія (event).
Отже, розробка Kafka стартувала в 2009 році в компанії LinkedIn двома мовами — Java і Scala. У 2011 році Kafka, як і багато інших проектів цієї компанії, стала open-source, а її автор, Джей Крепс, у 2014 році заснував свій стартап Confluent для доопрацювання Kafka у плані enterprise features для численних клієнтів. Таким чином, ви могли використовувати Kafka у базовому варіанті (як Apache Kafka), або з додатковими обважуваннями від Confluent як Conflient platform (яка існує у безкоштовному та enterprise варіанті). Але Confluent не припиняла розробку базової платформи, роблячи до 80% всіх commits.
Завдяки чому Apache Kafka стала такою популярною:
- Це розподілена система з підтримкою реплікації даних.
- Kafka пропонує максимальну ефективність обробки даних завдяки тому, що її черги (топіки) поділяються на partitions (паттерн Competing Consumers), що підтримують паралельну обробку.
- Всі повідомлення зберігаються на диску та не видаляються після обробки.
- Гарантована доставка та отримання повідомлень.
- Є клієнти для всіх мов програмування.
- Можна використовувати в системах реального часу.
Недарма Kafka була включена в так званий SMACK стек (Spark, Mesos, Akka, Cassandra, Kafka), який фактично є стандартом для обробки великих обсягів даних. Але технологічно Kafka дуже складна, тому що використовує ще одну технологію Apache Zookeeper для зберігання конфігурації, управлінням кластером та багатьох інших адміністративних справ. Тому, якщо вам потрібна high-availability і failover, вам потрібен кластер, що включає сервери Apache Kafka, так і Zookeeper.
Крім тих фіч, які є у більшості розглянутих нами технологій, у Kafka є ще три технології, без яких вже важко її уявити:
- Kafka Streams — обробка та перетворення повідомлень з одного топіка в інший засобами Kafka серверів.
- Kafka Connect — імпорт повідомлень із вихідних джерел даних (наприклад, СУБД чи файли) та експорт у зовнішні системи (ElasticSearch, Hadoop).
- Ksqldb — сервер, який пропонує SQL-схожу мову з підтримкою CLI для роботи з повідомленнями так, ніби це дані, які знаходяться в реляційній СУБД.
Які додаткові компоненти є у Confluent platform:
- Control Center
- Kafka connectors
- Self-balancing clusters
- Replicator
- Auto Data balancer
- JMS Ciient
- Tiered Storage
- Підтримка Kubernetes
Як ви бачите, це дуже потужна технологія, яка виросла зі статусу «перспективна» і, по суті, не має конкурентів у своїй ніші. Вона повністю задовольняє всі вимогИ до системи обміну повідомленнями, які ми намітили, тому я пропоную використовувати саме її.
Zookeeper. To be or not to be
Спочатку особливістю Kafka було те, що вона виступала в ролі брокера, системи для відправки та зберігання повідомлень. Багато адміністративних завдань, такі як зберігання метаданих та вибори лідера в кластері, були покладені на сторонню систему — Apache Zookeeper. Zookeeper — open-source технологія для розподіленого зберігання даних, яка була обрана на роль Kafka Controller. По суті, це key-value база даних дуже швидка, так як зберігає дані в пам’яті, розроблена як fault-tolerant.
Але поступово стало зрозуміло, що використання Zookeeper не йде на користь Kafka:
- Необхідно було вивчити та вміти налаштовувати Zookeeper, який також потрібно запустити у своєму кластері.
- Zookeeper виявився не настільки ефективним на випадок, коли кількість брокерів і патріцій досягає сотень і тисяч одиниць.
- Zookeeper — це окремий проєкт, на розвиток якого розробники Kafka не можуть впливати.
- Конфігурація та метадані кластера зберігалися у двох різних системах, що призводило до певних проблем розсинхронізації при збоях та відновленні системи в цілому.
Тому Kafka 2.8 вперше з’явилася можливість запускати брокер без використання Zookeeper. Це виявилося не так просто технологічно, для цих цілей було розроблено новий протокол Kafka Raft (KRaft) та новий MetadataFetch API. Тепер Kafka сервер може виконувати одну з двох ролей (або обидві):
- брокер;
- контролер.
Роль брокера не змінилася, а ось контролер (або кворум контролер) тепер управляє метаданими кластера і повідомляє про зміни за допомогою подій, які також зберігаються в Kafka. Це спрощує поведінку брокерів, яким треба просто підписатися на певний топік (metadata topic) і отримувати оновлення. Таким чином, у разі збою брокера йому потрібно буде після перезавантаження лише підтягнути останні зміни, а не стан кластера. Оскільки контролерів має бути кілька, з них вибирається так званий активний контролер, який і буде відповідальним за керуванням всім кластером і реплікацію даних для інших контролерів. Якщо ви не вказуєте роль для Kafka-вузла, то вважається, що він працює в legacy-режимі, тобто йому все ж таки потрібен буде Zookeeper для роботи.
Хоча планувалося, що в Kafka 3.0 новий підхід буде визнаний безпечним для роботи і вільним від критичних помилок, навіть у версії 3.1, яка вийшла в січні 2022 року, він все ще оголошений працюючим у режимі «preview». Крім того, досі немає автоматичного способу конвертації існуючих систем (з Zookeeper) у новий режим (без нього). Без цього використовувати його в production практично неможливо, оскільки ручна конвертація займе занадто багато часу і загрожує помилками.
Ми для цієї статті можемо вибрати будь-яку модель. Але цікавіше спробувати і обкатати новий варіант, оскільки ми тепер можемо обійтися одним Kafka сервером, який буде і брокером, і контролером. Тим більше, що саме ця модель стане основною (і єдиною) у Kafka.
Зберігання даних у Kafka
Основною бойовою одиницею в Kafka є повідомлення, яке аналогічно за своєю роллю запису в базі даних. Будь-яка операція, будь-який Kafka сервіс працює із повідомленнями. Повідомлення складається з наступних полів:
- ключ;
- значення;
- час відправлення (timestamp);
- метадані (заголовки) — з’явилися у Kafka 0.11.
Ключ і значення є байтовим масивом, і Kafka не особливо цікавить вміст. Ви можете вибрати будь-який формат (JSON, Avro, Protobuf) для значень. Тільки значення і timestamp є обов’язковими полями. Такий підхід докорінно відрізняється від реляційних баз даних, де кожен запис є унікальний первинний ключ. Чому так? Насамперед тому, що первинний ключ у БД використовується для запитів і для зберігання записів (вони всі відсортовані за первинним ключем). У Kafka унікальним ключем для кожного запису є offset — її зміщення від початку загальної черги. Але ключ, зрозуміло, теж важливий, насамперед для споживачів, оскільки він логічно дозволяє визначити, якого об’єкту належить повідомлення.
Коли повідомлення потрапляє в брокер Kafka, воно поміщається в один з topics. Що таке topic (предмет) і навіщо він потрібний? У найпростішому випадку черга повідомлень є великим глобальним списком, куди відправляються всі повідомлення, які потім доставляються споживачам. Це схоже на дискову підсистему, яка не підтримує папки, і всі файли зберігаються в кореневому каталозі. Але найчастіше учасникам комунікації потрібні лише певні типи повідомлень. Тому в Kafka всі повідомлення зберігаються в topics, які схожі на папки і логічно поєднують повідомлення за якоюсь функціональною ознакою. Відправники і споживачі самі повинні домовитися, який topic вони будуть використовувати і для цього вказати його ім’я (унікальне в межах брокера). Зрозуміло, такий topic повинен існувати на момент відправлення повідомлення або спеціальна властивість auto.create.topics.enable має бути встановлено в true. Аналогічна концепція існує, наприклад, в MongoDB, де колекція створюється за запитом, при першому використанні.
Topics можна порівняти і таблицею у базі даних, але якого типу: реляційною чи NoSQL? Чи потрібно зберігати в topic’е повідомлення лише одного типу? У реляційній СУБД це єдиний можливий варіант, тоді як у NoSQL базі даних документи можуть мати довільну структуру. Істина, як завжди, посередині. Якщо у нас є безліч подій, які відносяться, наприклад, до платежу (PaymentCreated, PaymentSuccess, PaymentError, і т. д.), то ми можемо створити для кожного з них окремий topic. І це спростить їхню обробку. Але тоді ми не можемо гарантувати, що їх буде оброблено в тому ж порядку, що й відправлено. А це створити небезпечну можливість рідкісних колізій, причину яких буде важко знайти. Тому загальне правило таке. В одному topic повинні бути повідомлення, які передбачають певний порядок їх обробки (зазвичай, це повідомлення, які відносяться до однієї сутності).
Проте topics у Kafka не є атомарними структурами. Кожен topic складається з розділів (partitions), при чому повідомлення не дублюються за розділами. Така схема зберігання аналогічна кошикам (buckets) у HashMap і була задумана для прискорення отримання повідомлень споживачами. Якщо ми маємо 5 споживачів, то створивши 5 розділів, ми можемо асоціювати кожен з них зі своїм споживачем для того, щоб вони паралельно і незалежно один від одного отримували повідомлення. При цьому Kafka гарантує, що повідомлення з одним і тим ключем потраплять в той самий розділ.
Але в такому разі можливі колізії. Якщо у нас 10 розділів, а як первинний ключ використовується, наприклад, ідентифікатор (email) користувача, то якщо для якихось користувачів дуже багато повідомлення, то їх розділи будуть переповнені. А розділи, які стосуються інших користувачів, будуть практично порожні. Цього можна уникнути, генеруючи ключ як випадкове число (UUID), але тоді повідомлення, які стосуються одного користувача, будуть у різних розділах. І не гарантується, що порядок надсилання повідомлень буде таким самим, як порядок їх отримання (оскільки розділи заповнені не однаково, і швидкість отримання різна). Якщо порядок важливий, то доведеться повернутися до першого варіанта.
Що собою фізично представляє розділ? Це простий лог-файл, який зберігає повідомлення. Але оскільки повідомлень може бути сотні тисяч і мільйони, то такий файл зростав би з величезною швидкістю, що ускладнювало операційну систему роботу з ним. Тому Kafka розбиває цей файл на окремі файли (сегменти) фіксованого розміру. Як тільки якийсь сегмент заповнюється повідомленнями, створюється новий порожній.
Таким чином, для того, щоб відправники та споживачі могли обмінюватися повідомленнями через Kafka, вони повинні заздалегідь домовитися про три речі:
- Назва topic’a.
- Тип ключа (і його наявність чи відсутність).
- Формат значення у повідомленні (наприклад, JSON або Protobuf).
Як бачимо, Kafka поєднує риси і messaging system, і сховища даних. Але якщо в класичній базі даних повідомлення зберігаються вічно, то в Kafka в цьому немає необхідності. Який сенс зберігати повідомлення, які вже були прочитані та опрацьовані споживачами? Швидше за все, вони більше нікому не знадобляться. Це така сама проблема, як і зберігання лог-файлів. Тому тут прийнято політику очищення старих повідомлень (retention) виходячи з максимального ліміту кількість повідомлень чи максимальний час їх зберігання. Також Kafka може стискати розділи та видаляти старі повідомлення, шукаючи дублікати за ключом.
Висновки
У першій частині ми детально ознайомилися з Apache Kafka, її особливостями, які відрізняють її від конкурентів та попередників. По суті, розпочавшись як open-source проект під крилом Apache Software Foundation, Kafka за допомогою компанії Confluent перетворилася на потужну платформу для зберігання та обробки повідомлень, у тому числі потокових.
У наступній частині ми розглянемо, як використовувати її на практичних прикладах.
15 коментарів
Додати коментар Підписатись на коментаріВідписатись від коментарів