Як ми використовували AWS DMS та Kafka Streams для міграції на мікросервісну архітектуру

💡 Усі статті, обговорення, новини про DevOps — в одному місці. Приєднуйтесь до DevOps спільноти!

Привіт, я Артем, працюю Senior Java інженером в компанії Geniusee. У цій статті я поділюся нашим досвідом використання CDC (на базі AWS DMS) та Stream processing (на базі Kafka Streams) для задачі міграції з 8+ річного моноліту на мікросервісну архітектуру. Покажу кінцеву архітектуру, розкажу про переваги та недоліки такого рішення. Я не буду заглиблюватися в те, як приймалося рішення переходу на мікросервіси і чому, але поверхнево опишу процес.

В статті використовуються англійські слова та іноді англіцизми, старався мінімізувати де зміг, але буду радий вислухати пропозиції.

Вступ

Почну з того що проєкт на якому я працював стосувася EdTech. Один з найбільших сервісів для онлайн-навчання у Великій Британії, з тисячами активних користувачів та десятками тисяч активних онлайн-уроків. Це все керувалося відносно великим монолітом, який розроблявся багатьма командами більше ніж 8 років, з доволі консервативним стеком: Java 8 (потім мігрували на Java 11), Jakarta EE(Wildfly), JSF(OmniFaces and PrimeFaces), MySQL.

На щастя, з самого початку інженери проєкту заклали дуже хорошу базу. Компонентний підхід до розробки де кожен компонент має свої залежності та лежить в окремому пакеті з мінімальною зв’язністю. Крім того, ці компоненти покривалися динамічними фіча прапорцями які адмін контролював через UI. Новим інженерам було досить легко влитися в стиль роботи.

З часом компанія почала стрімко розвиватися, функціонал почав додаватися дуже швидко. З’явилося більше ніж 5 команд по 4-6 людей, які одночасно працювали над різними компонентами перетинаючись час від часу між собою. Якість коду почала падати, review стали децентралізованими. Команди працювали в різних режимах (Scrum або Kanban) з різними циклами. Була створена черга деплою та тестування на певному середовищі кожною командою. В чаті розробників ми мали умовний токен, що контролював чергу тестування, деплою та релізу. Токен потрібно було брати перед тестуванням функціоналу та віддавати після релізу. Це створювало досить таки великі затримки, іноді вони ставали ще більшими через adhoc-фікси. Все це затримувало реліз фіч. Зокрема, кодова база настільки росла що всі технічні процеси (review, testing, CI/CD) стали значно довшими. Бізнесу було важливо швидко релізити конкретну частину вимог.

На фоні цього було прийнято рішення поступово відокремлювати частини моноліту та робити їх як окремі мікросервіси. Іншими словами, ідея була піти шляхом на кшталт «Strangler Fig Pattern». З самого початку це були зовсім маленькі та критичні компоненти. Я не буду заглиблюватися в деталі, як саме обирали функціонал, який потрібно винести. Часто це були event-storming сесії в Miro з окремими командами враховуючи бізнес-контекст. Звісно такі міграції не проходять легко, одною з основних проблем децентралізованих систем є узгодженість даних. В мікросервісній архітектурі зокрема це також проблема рішення відповідальності за дані. А також що робити, якщо один мікросервіс потребує даних з іншого (наприклад, payment сервіс потребує даних з user profile service чи навпаки). В ідеальному світі ми маємо ізольовані мікросервіси проте на практиці це реалізувати важко.

Існує багато способів обміну даними між сервісами. Певно, найпростіший — синхронний HTTP-запит, коли один сервіс напряму звертається до іншого за потрібними даними. Альтернатива — асинхронна комунікація, коли сервіси не викликають один одного безпосередньо, а реагують на події чи повідомлення про зміни даних. У нашій архітектурі ми використовували обидва підходи залежно від контексту.

Для асинхронної взаємодії обрали Kafka (AWS MSK) брокер. Під час декомпозиції виникало все більше залежностей на одні й ті самі дані моноліту. Наприклад, дані про уроки потрібно було отримувати на різних мікросервісах (tutor-service, payment-service, pupil-service), щоб оновлювати рейтинг викладачу, знімати кошти з рахунку, повідомляти учнів. Отже, виникала потреба на мікросервісах робити певні дії коли конкретний запис(в нашому випадку уроку) змінюється. На цьому етапі потрібно було вирішити як саме дані з моноліту мають опинитися на різних мікросервісах.

Ми можемо робити якусь операцію на моноліті, наприклад, скасування уроку. Змінювати його статус в базі даних, а після цього надсилати подію в Kafka topic з усіма змінами. Інші мікросервіси отримують асинхронну подію про ці зміни та роблять відповідні дії.

Але на практиці це створює кілька проблем:

Модифікація моноліту. Щоб відправити подію — треба оновлювати код моноліту, тестувати, деплоїти. Це ризики, знову ж таки, розширення функціоналу на самому моноліті. Те чого б ми хотіли уникнути. Крім того, зміни в подіях, наприклад, додавання атрибута до уроку вимагали б оновлення на мікросервісах та моноліті. Задача була максимально ізолюватися від моноліту. До того ж ми знову перетинаємося з командами, які продовжують підтримувати моноліт.

Гарантія надсилання події в брокер та оновлення бази даних. Це одна з проблем розподілених систем, коли є потреба оновити дані на двох сховищах даних одночасно атомарно. У нашому випадку потрібно оновити базу даних та надіслати подію в Kafka за одну транзакцію. Якщо ми оновимо статус уроку в базі, а наступною операцією надішлемо подію про зміну статусу — то є ймовірність що при надсиланні події виникне помилка. В такому випадку потрібно робити rollback в базі. Але може навіть статися таке, що сам сервер просто впаде. Тоді і rollback не вийде зробити, отже подія втрачена. Якщо спочатку відправляти подію, а потім зберігати в базу — то це ще гірший варіант, оскільки є ймовірність що ми надішлемо оновлення мікросервісам, ті зроблять відповідні дії, а на етапі збереження в базу виникне помилка. В такому випадку мікросервіси отримають оновлення, яке фактично не збереглося.

З особистої практики виглядає, що на початкових фазах проєкту або в дуже швидкому середовищі, по типу стартапів, цією проблемою дуже не переймаються, приймають ризик, що подія може бути втрачена в крайніх випадках. Тобто спочатку зберігають дані в базу, а потім надсилають події в брокер.

Є різні підходи, як це вирішити. Наприклад, distributed transactions, але це все складні рішення, які потребують технічної експертизи, а також дуже вагомих причин. Також автор книжки «Microservices patterns» пропонує альтернативу: transactional outbox. Принцип доволі непоганий, проте конфліктує з попереднім пунктом «Модифікація моноліту».

Альтернативним варіантом вирішення двох проблем є використання CDC-підходу, на якому і зупинилися.

Change data capture

Change Data Capture (CDC) — це технологія, яка відстежує всі зміни в базі даних у режимі реального часу вставки, оновлення, видалення і передає їх у вигляді оновлень до інших систем. Простіше кажучи, CDC «слухає» сховище даних, помічає, коли відбуваються INSERT, UPDATE чи DELETE в таблицю та надсилає ці оновлення в місце призначення. Це може бути брокер повідомлень, інша база даних, AWS S3 і так далі.

CDC дозволяє отримувати події про зміни в базі даних, не змінюючи код моноліту взагалі. Цей процес повністю ізольований від основного застосунку та може розгортатися, оновлюватися й масштабуватися незалежно від моноліту. Також це дозволяє мікросервісам гарантовано надсилати події після того, як зміни запишуться в базу. Покриваємо і другу проблему.

Отже, ми зупинилися на тому, що можна інтегрувати CDC, але питання: «Яке рішення використовувати?»

AWS DMS

На ринку є достатньо різних CDC-рішень, таких як Debezium, Airbyte і так далі. Проте більшість з них це сторонні сервіси, які треба або розгортати та підтримувати, або впроваджувати в існуючу інфраструктуру. Після аналізу різних варіантів ми зупинилися на AWS DMS (Database Migration Service), оскільки це нативне рішення від AWS, яке найпростіше інтегрується з нашою поточною інфраструктурою, швидко конфігурується та не вимагає додаткового управління середовищем.

Взагалі DMS першочергово розроблявся як сервіс для міграції даних з одного джерела даних в інше. Він гарно підходить для тимчасових операцій, тобто як рішення, яке має виконати свою задачу і на цьому все. Тим не менш, DMS має і ongoing replication функцію, яка працює в режимі CDC. Якщо коротко — то в нашому випадку це працює так:

  1. DMS налаштовується на «слухання» MySQL-бази.
  2. Читає WAL (Write-Ahead Log) — журнал транзакцій, де записані всі зміни в таблицях.
  3. Для кожної зміни (INSERT, UPDATE, DELETE) формує подію з before/after image даних.
  4. Відправляє цю подію у вибране місце призначенн я (Kafka topic).

AWS DMS підтримує source (вхідні) та target (вихідні) джерела. Для нашого випадку потрібно було зчитувати всі операції з таблиць бази MySQL (AWS RDS) та публікувати у відповідні Kafka топіки (AWS MSK). Для цього використовується вбудоване DMS поняття task.

Task (задача) — це основна одиниця роботи, яка виконує реплікацію або міграцію даних між джерелом та місцем призначення (source). Ці задачі запускаються на provisioned instance, по факту це віртуальна машина (EC2), яку ви можете обрати.

До речі, станом на зараз AWS DMS підтримує і serverless-підхід.

Отже, один такий інстанс може запускати багато задач. Одна задача може читати зміни з багатьох таблиць в базі даних (1-N). В більшості випадків ми використовували зв’язку: одна задача читає одну таблицю (1-1). Цей підхід має декілька переваг:

  1. Контроль над налаштуваннями. Задачі можуть мати різну частоту оновлень, місце призначення і так далі.
  2. Масштабування. Одна задача може бути дуже навантажена (наприклад, дуже багато оновлень в таблиці), а інша задача може простоювати. В нашому випадку таблиця уроків оновлювалася набагато частіше ніж, наприклад, таблиця шкільної програми. Отже, можна масштабувати окремі таски.
  3. Помилки, які виникають на реплікації однієї таблиці, ніяк не впливають на реплікацію іншої. Якщо не працює зчитування оновлень з таблиці уроків, то оновлення з таблиці шкільних програм буде працювати.

Як було сказано вище, задача зчитує зміни з таблиці та відправляє їх у відповідний топік брокера. Ці зміни можуть містити дані до та після оновлення. Якщо ми змінюємо статус уроку — то отримуємо приблизно таку подію в топіку:

  { 
   "data": {
     "id": 961,
     "createdOn": "2025-07-08T00:18:58Z",
     "modifiedOn": "2025-07-08T00:18:58Z",
     "version": 0,
     "status": "COMPLETED",...
   },
   "before": {
     "id": 961,
     "createdOn": "2025-07-08T00:18:58Z",
     "modifiedOn": "2025-07-08T10:18:58Z",
     "version": 0,
     "status": "IN_PROGRESS",...
   },
   "metadata": {
     "timestamp": "2025-07-08T10:18:58Z",
     "record-type": "data",
     "operation": "update",
     "partition-key-type": "primary-key",
     "transaction-id": 123
   }
 }

Мікросервіси можуть зчитувати події та порівнювати попередні дані з поточними. Але кілька важливих нюансів із форматом вихідних даних AWS DMS. Він досить сирий, не зовсім те, що потрібно на мікросервісах. По-перше, ключ partition події формується як комбінований schema.table.primary_key, що не збігається з доменним ключем, який очікують наші сервіси. По-друге, формати полів не стандартизовані під наш контракт: прості типи (наприклад, boolean) приходять як 1/0, а складні масиви/JSON можуть мати іншу структуру, ніж очікують мікросервіси.

Щоб вирішити ці моменти, можна використати вбудований в AWS DMS механізм transformation, який дозволяє змінювати події перед тим, як вони попадуть у місце призначення. Варіант робочий, але дуже обмежений. Іншим варіантом може бути створення окремого мікросервісу, який буде приймати події від DMS з Kafka, форматувати їх як потрібно та надсилати в інший топік. Цей варіант вимагає зусиль, проте більш гнучкий, до того ж поверх цього ще накладаються додатковий функціонал, який описаний далі.

Простих оновлень з таблиці стало недостатньо, як виявлося, багато мікросервісів залежать не тільки на дані самої події, а ще й на зв’язні дані. Наприклад, багатьом сервісам при отриманні події про оновлення уроку потрібні також дані про поточних учнів цього уроку. Або сервіси, які опрацьовують зміну даних викладача, потребують також актуальний на момент часу графік цього викладача. Оскільки ми старалися будувати мікросервіси в DDD-стилі, то таких випадків стало більше, особливо коли є досить обширний aggregate. Отже, виникла потреба доповнювати події додатковою інформацією. Знову ж таки, тут є декілька варіантів, як це зробити. Основні, з яких ми розглядали:

  1. Мікросервіс, який потребує додаткові дані при опрацюванні події, робить HTTP-запит на моноліт та отримує те, що потрібно. Мінуси очевидні, залежність на моноліт та його додаткове навантаження, опрацювання помилок.
  2. Наповнити подію додатковими даними до моменту, як її зчитують мікросервіси. Мікросервіси отримують все, що потрібно безпосередньо в події.

Ми обрали другий варіант, ось тут і стало в нагоді stream processing рішення за допомогою Kafka Streams.

Kafka Streams

Kafka Streams — це клієнтська бібліотека, яка працює з брокером повідомлень Kafka.

Використовується для опрацювання потокових даних, тобто повідомлень (events) від брокера, агрегації цих даних, перетворення і так далі. Дозволяє швидко та лаконічно будувати топології (ланцюжки з опрацювання подій). На відміну від інших stream processing фреймворків по типу Apache Flink, Apache Spark, Hazelcast Jet — Kafka Streams не потребує змін інфрастуктури та розгортання окремого інстансу.

Достатньо додати бібліотеку в існуючий сервіс. Проте ціна, яку ви за це платите менший функціонал і можливість інтеграції лише з Kafka-брокером. Для роботи бібліотека створює додаткові топіки в Kafka та використовує диск для збереження стану (про це скажу пізніше).

Kafka Streams дозволяє закрити нашу потребу доповнювати та перетворювати, фільтрувати і модифікувати події, які CDC відслідковує та надсилає в брокер. Ось приклад простої топології.

public static KStream<String, TutorEvent> modifyAndFilterTutorChanges(KStream<String, TutorCDCEvent> stream) {
   return stream
     .filter((key, change) -> change != null && change.coreFieldsChanged())
     .mapValues(TutorChange::toNormalizedEvent)
     .peek((k, v) -> log.info("TutorChange OUT: key{} value{}",k, v));
}

KStream — це абстракція над вхідним потоком даних з топіку Kafka. Отже, ви отримуєте потік подій, фільтруєте, змінюєте його та знову відправляєте в інший топік, але вже як TutorEvent.

Нижче наведу трішки цікавіший приклад, коли є потреба доповнити подію оновлення уроку з відповідною подією викладача (якщо такий присутній) та перетворити це все в загальну подію, яку зчитує сервіс, що займається шкільними програмами.

public KStream<String, ProgramLessons> programLessons(
        KTable<String, Lesson> lessonTable, 
        KTable<String, Tutor> tutorTable) {

    return lessonTable
            .leftJoin(tutorTable,
                Lesson::tutorId,
                ProgramLessonsChanges::toTutoredLesson,
                Materialized.with(Serdes.String(), tutoredLessonJsonSerde)
            )
            .mapValues(ProgramLessonsChanges::toProgramLessons)
            .toStream()
            .peek((key, value) -> log.info("Key:{}, Value:{}", key, value));
}

Як бачимо на прикладі, тут використовується KTable замість вищезгаданого KStream.
KTable — це абстракція, яка дозволяє зберігати події на локальному диску кожного інстансу. Це потрібно для того, щоб зберігати останній відомий стан кожного ключа.

За рахунок того, що локальний стан збережено, можна буквально робити join двох топіків з брокера по спільному id. Тобто якщо є топік, куди приходять оновлення по окремих уроках, а також топік, куди приходять оновлення по викладачах, то ми можемо зробити join уроку з викладачем по id викладача. Join відбудеться з останнім актуальним станом викладача, що був в топіку. Концепція дуже схожа на звичайний SQL, що дозволяє об’єднувати різні таблиці.

Наведу ще один приклад, якщо ми отримували декілька оновлень того самого викладача, тобто декілька подій від CDC. Наприклад, викладач змінив свою пошту на [email protected], потім ще раз на [email protected]. Ми отримуємо 2 події в CDC-топіку. KTable буде зберігати лише останню подію [email protected].

Таким чином подію уроку ми доповнимо актуальною інформацією про викладача.

Крім join-операцій, Kafka Streams дає цілий набір інших можливостей.

Exactly-once delivery. Exactly-once delivery у Kafka Streams гарантує, що кожна подія опрацьовується лише один раз. Streams використовує вбудовані транзакції Kafka: коли подія зчитується з вхідного топіка, оновлення state store і запис результату у вихідний топік відбуваються в межах однієї транзакції. Це гарантує узгодженість стану та відсутність дублікатів.

Шикорий спектр stream processing операторів. Можна не лише читати події, а й агрегувати, фільтрувати, трансформувати та комбінувати їх у реальному часі. Використовувати windowing-техніки. Це дозволяє створювати бізнес-логіку просто в потоці, без проміжних БД і без окремих ETL-шарів.

Проста масштабованість. Обробка автоматично масштабується разом із кількістю partitions у Kafka. Тобто найпростіший спосіб масштабувати Kafka Streams сервіс — це збільшити кількість partitions та додати відповідну кількість інстансів Kafka Streams.

Легка підтримка та розширюваність. Kafka Streams не потребує окремого кластера чи менеджера — це звичайна бібліотека, яка працює поверх Kafka topics. Тому її легко впровадити в існуюче рішення. Топологія описується звичайним Java-кодом. Додати нову гілку обробки чи змінити схему подій — кілька рядків, без додаткової інфраструктури. Streams добре інтегрується зі Spring Boot, Spring Clould Streams і легко деплоїться у звичайний мікросервіс.

Вбудований KSQL (Kafka SQL). Для швидких експериментів або ад-hoc аналітики є вбудований інструмент, за допомогою якого запитом типу SQL можна витягнути дані, які опрацьовують Kafka Streams інстанси. Отже, ми буквально робимо SQL-like запит та будуємо результат на даних реального часу, які опрацьовує Kafka Streams з різних топіків.

Хоча виглядає так, що цей функціонал скоро не буде розвиватися взагалі, натомість використовується альтернатива.

Архітектура рішення

В кінцевому результаті вийшла наступна інтеграція:

Щоб отримати дані моноліту на конкретних мікросервісах, проходить ланцюжок опрацювань:

  1. Користувач або система оновлює дані, що зберігаються в MySQL через моноліт.
  2. AWS DMS реагує на оновлення та надсилає подію у вхідний Kafka-топік.
  3. Cdc processor мікросервіс зчитує дані від DMS в «сирому» вигляді, фільтрує, модифікує, доповнює даними інших топіків.
  4. Cdc processor надсилає результат у вихідний Kafka-топік.
  5. Конкретний мікросервіс, який зацікавлений в такому оновленні, зчитує вихідний топік та опрацьовує повідомлення.

Висновки

Перехід із моноліту до мікросервісів — це завжди компроміс з точки зору підходу та рішень. Комбінація CDC та Stream Processing дозволила нам зробити цей перехід поступово, контрольовано та ізольовано. Ми отримали архітектуру, де сервіси реагують на зміни даних у моноліті.

У нашому випадку реалізація CDC була виконана за допомогою AWS DMS, а Stream processing за допомогою Kafka Streams. Загалом міграція на мікросервісну архітектуру може відбуватися багатьма способами (це гарно описано в книзі). Використовували CDC — лише один з них, який ми обрали. Особливість цього підходу в тому, щоб надати тимчасове рішення з отримання даних з моноліту для мікросервісів.

Переваги рішення

  • Ізоляція мікросервісів без втручання в моноліт, швидка інтеграція змін в проді без великих ризиків.
  • Підготовка мікросервісів до повноцінної event-driven архітектури.
  • AWS DMS легко налаштовується за допомогою JSON-конфігурацій та добре працює в парі з Infrastructure as Code (IaC) інструментом.
  • Те саме можна сказати про застосування Kafka Streams. По-перше, бібліотека не потребує окремої інфраструктури та інстансу, просто додати залежність і можна працювати. По-друге, порівнюючи з іншими stream processing frameworks, має дуже зручний DSL, тобто не заглиблюючись в деталі, можна швидко будувати та тестувати рішення.

Недоліки рішення

  • AWS DMS першочергово призначений для одноразових/короткочасних міграцій. Наприклад, мігрувати дані з PostgreSQL в Oracle і так далі. Використовувати його як CDC можна, але він не передбачений для постійного ETL-рішення. На це є ряд причин:
    • Обмежена кількість source та target. Інші CDC-рішення дають набагато більший вибір. AWS DMS покриває в основному сервіси AWS + пару баз даних.
    • На момент нашої реалізації AWS DMS таски було дуже важко дебажити та й взагалі вони були досить ненадійні. Виникало багато проблем, включаючи конфігураційні та внутрішні помилки AWS. Бувало таке, що конфігураційні помилки просто напросто видно лише в консолі (у форматі Something wrong), логів ніде немає. Фіксити потрібно було експериментальним методом. На AWS-форумах та Reddit пишуть, що приходилося навіть звертатися в сапорт, щоб виправити проблему. Таски час від часу відвалювалися незрозуміло чому, потрібно було їх перезапускати. Станом на зараз пишуть, що багато проблем виправили, проте особисто не перевіряв. В нашому випадку проблеми такого роду не були критичними.
    • AWS DMS навантажує базу даних. Кожен таск вичитує дані окремо, як варіант можна вичитувати декілька таблиць в одному таску, проте тоді ми втрачаємо ізольованість. Якщо щось піде не так, то ми не будемо вичитувати дані з декількох таблиць одночасно.
  • За замовчуванням AWS DMS має доволі низький ліміт на розмір оновлення та події, яка надсилається в target. У нас виникало декілька проблем, коли повідомлення просто обрізалося. Щоб вирішити цю проблему достатньо збільшити максимальний розмір в конкретному таску.
  • Kafka Streams зберігає стан (state) локально на диску. При базових налаштуваннях для збереження стану використовується база даних RocksDB. Вона оптимізована під збереження на локальному диску. У цьому сховищі зберігаються матеріалізовані таблиці (KTable), агрегати, join-и, кеші, і так далі. Якщо ви запускаєте сервіс в оркестраторах (ECS, Kubernetes, etc) — цей локальний state зникає при рестарті контейнера. Ми вирішували це за допомогою виділення окремого volume, який зберігається між контейнерами. Це дозволяє значно пришвидшити перезапуск інстансів, що реалізують Kafka Streams.
  • Потрібно визначити та слідкувати за оптимальною кількістю partitions для топіків, куди мають приходити оновлення від AWS DMS. Оскільки для масштабування Kafka Streams сервісу потрібно, щоб кожен інстанс опрацьовував принаймні 1 partition. Інакше ефекту недодавання нових інстансів не буде.

Ми дійшли висновку, що AWS DMS не є ідеальним рішенням для CDC — воно додає зайву складність в інфраструктуру, потребує постійного моніторингу. Тому попри початкову мету уникнути змін у моноліті поступово зменшуємо залежність від DMS і розглядаємо альтернативи. У деяких випадках все ж вирішили надсилати події безпосередньо з моноліту, свідомо приймаючи ризики, про які згадувалося раніше. Це не ідеальний підхід, але він простий та швидкий.

Тим не менш, використання CDC саме як проміжного етапу значно спростило та пришвидшило процес міграції та було виправданим. Наступні кроки — поступово зменшувати роль CDC у системі. Коли всі ключові сервіси будуть відокремлені, саме вони стануть джерелом бізнес-подій у Kafka. В ідеальному випадку це були б мікросервіси з ізольованою відповідальністю.

Дякую всім за увагу! Буду радий почути вашу думку про підхід та статтю, а також вислухати пропозиції та зауваження!

Джерела:

  1. martinfowler.com/...​anglerFigApplication.html
  2. en.wikipedia.org/wiki/Event_storming
  3. www.confluent.io/...​earn/change-data-capture
  4. microservices.io/...​transactional-outbox.html
  5. www.confluent.io/...​th-confluent-and-immerok
  6. www.oreilly.com/...​roservices/9781492047834
  7. popsink.com/...​n-and-change-data-capture
  8. rocksdb.org

Сподобалась стаття? Підписуйтесь на автора, щоб отримувати сповіщення про нові публікації на пошту.

👍ПодобаєтьсяСподобалось14
До обраногоВ обраному5
LinkedIn
Дозволені теги: blockquote, a, pre, code, ul, ol, li, b, i, del.
Ctrl + Enter
Дозволені теги: blockquote, a, pre, code, ul, ol, li, b, i, del.
Ctrl + Enter

цікаво.

Single Message Transforms розглядали? тут ще трохи.

Kafka ж stateless, а описана логіка обробки записів з різних топіків в CDC Processor вже додає state, і виглядає як те що робить Apache Flink. можливо у майбутньому перейдете на нього, якщо процесинг буде ускладнюватися.

Дякую!
Цікава пропозиція, не розглядали на той час, не певен що ця річ була вже.
Бачу що там є обмеження по state, а також по join операціях. Нам би таке не підійшло.

і виглядає як те що робить Apache Flink. можливо у майбутньому перейдете на нього, якщо процесинг буде ускладнюватися

На жаль на проєкті вже не працюю =( Але як я писав Kafka Streams це тимчасове рішення для міграції, це не stream processing функціонал який нам потрібен. Тобто потреби у використанні stream processing у нас не було взагалі.

До того ж Kafka Streams як на мене швидше і простіше інтегрувати, те про що я згадував в перевагах.

Я дуже радий що AWS сервіс до розробки якого я причетний знайшов своє місце в статті на DOU ))

Добрий день.
Вас цікавлять деталі тестування, деплою та релізу чи процесу контролю умовного токену?
Якщо по деплою та релізу — то це нічого унікального, класичні GitHub Actions + ArgoCD, ну і ще пару інструментів.
А стосовно токену — то так як я описав. Команда брала токен коли були задачі на борді, які потрібно було тестувати на dev середовищі. Цей токен гарантував що лише ця команда може деплоїти на дев та в прод для того щоб уникнути конфліктів. Це все відбувалося в Slack каналі де були всі розробники моноліту та тім ліди.

Цікаво що за продукт, розкажете детальніше яку саме проблему він вирішує?

Дякую, цікаво!

Особисто в нашому випадку достатньо Slack каналу було, там можна створити чергу в каналі і проставити дати та час для кожного елемента в черзі. Тобто ми просто додаємо запис в кінець черги і час скільки нам потрібно для тестування. В кінці як ми завершили просто тегаємо в чаті наступну команду і все.

Мені здається що дедікейтед апка для такого це забагато). Якщо немає адекватного корпоративного месенджера то на крайній випадок можна і Google sheets використати, правда не так зручно.

Дякую за цікаву статтю!
З приводу Kafka Streams є пару питань.
Ось ці KTable:
— чи не буде проблемою, якщо таблиці дуже великі, відповідно треба локально розгорнути KTable теж доволі не маленький..
— якщо декілька зв’язаних записів оновилися у сорс базі в одній транзакції. Чи є гарантія, коли ми робимо Join через KTable, що отримаємо найбільш актуальний стан після коміту транзакції? Мені здається що ні, бо може статися що CDC для однієї з таблиць відпрацює швидше, а в KTable для зв’язаних таблиць ще будуть старі дані.

Дякую за коментар, дуже слушне запитання!

чи не буде проблемою, якщо таблиці дуже великі, відповідно треба локально розгорнути KTable теж доволі не маленький

Може бути проблемою, якщо є high-cardinality ключі. Тобто якщо дуже багато унікальних елементів в базі, під капотом вони будуть зберігатися на локальному диску інстансу Kafka Streams. Даних влізе стільки скільки є місця на жорсткому диску. Мені здається що там є retention policy для стану, кожен день видаляються старі дані. Також, RocksDB зберігає лише останній актуальний стан запису(по унікальному ключу) тому це треба мати прям серйозні об’ємми.

Ми пішли трішки іншим шляхом, під кожен інстанс для зберігання стану виділили в клауді EFS від AWS. Це дає швидший старт якщо інстанс впаде і треба підняти новий та збільшує розмір сховища.

якщо декілька зв’язаних записів оновилися у сорс базі в одній транзакції. Чи є гарантія, коли ми робимо Join через KTable, що отримаємо найбільш актуальний стан після коміту транзакції? Мені здається що ні, бо може статися що CDC для однієї з таблиць відпрацює швидше, а в KTable для зв’язаних таблиць ще будуть старі дані.

Взагалі немає гарантії, думаю що це проблема асинхронних систем в цілому. Така сама ситуація могла би виникнути якщо зв’язні дані контролються окремими мікросервісами.

В цілому, Kafka Streams може або почекати певний час і зробити join або взяти дані для join з свого локального сховища, про яке я згадував вище. Якщо подія ще не попала в Kafka-брокер взагалі як подія — то відповідно ніяк і не вийде зробити join, треба чекати.
В більшості випадків ми будемо робити join на найбільш актуальних даних які є або були в стані. Але є альтернативи:

  • задати вікно, щоб Kafka Streams почекало певний період часу для JOIN. Звісно що вікна може бути недостатньо, проте наврядчи в нормальних умовах у вас будуть постійно затримки
  • Знаю що ще додали фічу версійного сховища versionedkeyvaluestore
На практиці не виникало таких сценаріїв насправді, хоча їх важко відслідкувати.

Для JOIN операцій важливо врахувати цей момент зі статті також:

Потрібно визначити та слідкувати за оптимальною кількістю partitions для топіків

Топіки можна джойнити лише за умови що вони мають однакову кількість partitions. Якщо ця умова не виконується, то Kafka Streams дозволяє зробити проміжний топік.

Взагалі для JOIN операцій є перелік вимог для топіків. в Confluence є ресурси де вони гарно описують ці моменти.

— асинхроність
— реплікація
— історія
— аудіт
— контроль черги
— послаблення пов’язаності
— багато споживачів на один пуш

SQL це мова, Kafka це інструмент який добре скейлиться
Її можна використовувати як Service Bus, можна використовувати як append-only лог
Поверх Kafka є різні надбудови — наприклад Kafka Streams, Flink для обчислень, kSQLDB для SQL поверх кафки, etc

наприклад, ось www.linkedin.com/...​-7401647470696607744-EsJQ

> Powered largely by MySQL 8, our database fleet sustained 53.8 million queries per second and 4.28 billion row operations per second at peak

Kafka робить fan-out записам в БД, щоби вона не лягла від всіх бажаючих прочитати (більше тих процесів, що читають, ніж тих, хто пишуть). Консьюмери читають Kafka топіки замість запитів в БД.

Даже сумніваюсь що кафку вони використовують для скейла читання з бази. Для цього обирають реплікацію та кеши. В принципі вони і самі частково це підтверджують:
https://shopify.engineering/read-consistency-database-replicas

не для читання самого по собі, а для реагування на бізнес події, що трапляються в БД.

раніше використовували transactional outbox SMT для обраних таблиць, але потім експерименти з Flink довели що продуктові команди можуть отримати всю необхідну їм інфу через кастомні Flink SQL запити, з прийнятним latency.

клієнтів кафки ж цікавить, наприклад, не івент «варіант продукту доданий в корзину», а відразу бізнес подія створення ордеру + line items, і т.п.

Я в цілому згоден, але воно вже про ширшу проблему. Бо flink це надбудова і можна використовувати Кафка без цього. Transactional outbox і зараз використовують. «Наситити» данні в евенті також можна як спочатку, так і в обробнику, бо евентми про конкретні події і до бази будуть запити або по pk, або в гіршому випадку secondary index, ну чи flink/dms. Як краще, то в кожному конкретному випадку треба дивитись, ускладнювати архітектуру чи ні.

Можливо я зараз зламаю деякі уявлення, але він іноді навіть замість http використовується.
Взагалі скрізь, де потрібна контрольована «труба», використовують такі штуки

А потім «ринок жахливий, роботу неможливо знайти з 10 роками досвіду»... А насправді чуваки з 10 роками навіть не в змозі розібратися які інструменти є на ринку і як вони використовуються

Дякую, стисла, лаконічна і гарна стаття!

Дякую, радий що було корисно!

Підписатись на коментарі