Побудова гнучкої архітектури за допомогою Kafka і Kafka Connect

💡 Усі статті, обговорення, новини про DevOps — в одному місці. Приєднуйтесь до DevOps спільноти!

Вітаю, спільното. Мене звати Марк Норкін, маю понад 10 років досвіду в IT, з яких останні 7+ років займаюсь напрямом Data Engineering, зокрема побудовою різних Data Platform та рішень у сфері Data & Analytics. Зараз я займаю позицію Solution Architect у компанії GlobalLogic, де є частиною Technology Office компанії: зокрема, займаюсь pre-sales та advisory активностями для наших існуючих та потенційних клієнтів.

У світі, де дані створюються постійно й у великих обсягах, компанії шукають способи швидко, надійно й масштабовано передавати їх між системами. Важливою характеристикою для побудови таких систем є їхня гнучкість. І хоч в англійській літературі немає формальної дефініції терміну «гнучка архітектура», в рамках нашої розмови ми будемо розуміти під цим систему, яка легко адаптується до змін, росте разом із навантаженням і дозволяє інтегрувати нові компоненти з мінімальними зусиллями.

Одними з можливих інструментів для побудови таких систем є Apache Kafka, а також Kafka Connect — окремий компонент Kafka екосистеми для інтеграції з зовнішніми системами.

В цій статті ми розглянемо базові концепції та сутності по Apache Kafka та Kafka Connect, та в яких ситуаціях варто розглядати їхнє використання.

Базові основи з Apache Kafka

Перш ніж ми почнемо говорити про Kafka Connect, варто придиліти увагу Apache Kafka — технології, без якої Kafka Connect неможливий у використанні.

В рамках даної секції ми розглянемо лише базові речі по Apache Kafka, для більш детального розгляду цієї технології можна ознайомитись з іншими статтями на DOU, наприклад:

Також можу рекомендувати ознайомитись із наступними статтями, які демонструють приклади використання Apache Kafka:

Архітектура Apache Kafka

Отож, Apache Kafka це розподілена система по обробці подій у реальному часі. Основні характеристики цієї системи:

1. Вона є розподіленою, оскільки може бути розгорнута на декількох серверах, які називаються брокерами і які разом об’єднуються в кластер. При розгортанні кластеру зазвичай кількість таких брокерів є непарним числом (3/5/7/9).

2. З самого народження Apache Kafka має сторонню залежність на Apache ZooKeeper, який використовується для менеджменту кластеру, зберігання різного роду метаданих, секретів, ACLs. ZooKeeper є розподіленою системою і має також бути розгорнутий на декількох серверах, щоб забезпечувати fault tolerance. У більш нових версіях Apache Kafka (версії 3.3 і вище) цієї залежності можна уникнути, оскільки зʼявилась альтернатива у вигляді KRaft. І з версії Apache Kafka 4.0, яка вийшла 18 березня 2025 року, Zookeeper офіційно прибрали як залежність в цілому.

3. Kafka працює за моделлю Pub/Sub, де продюсери (producers) публікують події (повідомлення) в Apache Kafka, а консьюмери (consumers) потім їх читають. І продюсери і консьюмери є клієнтськими частинами. Наприклад, це може бути ваш Java-застосунок, який за допомогою kafka java client пише в Apache Kafka.

4. Брокери зберігають повідомлення на диску, забезпечуючи їхню надійність (durability) у випадку збоїв:

  • Ця якість також дозволяє повторно вичитувати ці повідомлення у майбутньому.
  • Термін збереження повідомлень на диску можна налаштовувати за двома варіантами по часу та по розміру даних.

Основні сутності в Apache Kafka

Пропоную розглянути такі основні сутності в Apache Kafka: Producers, Consumers, Topics, Partitions, Segments, Messages.

Messages/Records/Events

У Kafka всі дані передаються у форматі повідомлень (з англ. message, але record/event також зустрічаються у документації як синоніми).

1. Кожне повідомлення складається з метаданих і тіла.

  • З погляду метаданих є дані, які Kafka-клієнт формує сам (topic, partition, timestamp). І також є можливість додати свої власні як headers.
  • З погляду тіла самого повідомлення, воно складається з Key Value пари, де ключ є опціональним.

2. І ключ, і значення можуть мати будь-який тип даних (рядок, число, обʼєкт і так далі). Kafka використовує механізм Серіалізації, і тому в результаті дані, що відправляються з клієнта і зберігаються, є просто масивом байт.

Topics

Самі повідомлення надсилаються і потім зчитуються з конкретного Topic, який виступає логічною структурою. Користувачі можуть створювати довільну кількість топіків і налаштовувати їх під свої потреби. З погляду запису та читання, в один і той самий topic можуть писати багато продюсерів і читати багато консьюмерів. Дані ж, які записуються в topic, надалі є незмінними (immutable).

Partitions

Кожен Topic складається з визначеної кількості партицій (partitions). Їх кількість задається при його створенні, і надалі може тільки збільшуватись. Партиції є важливими з декількох причин, зокрема:

1. Kafka гарантує порядок повідомлень лише в рамках партиції. Тобто якщо ви записуєте повідомлення з одним і тим самим Ключем (або явно вказуєте партицію для запису на стороні продюсера), то ви зможете вичитати ці повідомлення в тому самому порядку, в якому їх записали. Серед цікавого є відомий приклад, де американська газета New York Times створила топік з однією партицією, щоб зберегти історію всіх випусків газет з дати заснування у порядку їх публікації.

2. Кафка забезпечує реплікацію даних саме на рівні партицій. Кількість копій (replication factor) конфігурується на рівні топіка на момент його створення.

3. Партиції є елементом паралелізму: чим більше партицій є у топіка, тим більше консьюмерів зможуть вичитувати кожен свою партицію в паралель. Але до вибору цього magic number все-таки варто підходити більш сфокусовано. В цій статті більше пояснюється ця проблематика.

Segments

Кожна партиція, своєю чергою, поділяється на сегменти. Сегмент — це конкретний файл на файловій системі, який зберігає в собі повідомлення, записані в цю конкретну топік-партицію. У кожен момент часу є лише один активний сегмент в партиції, в який відбувається запис даних. Сегмент буде активним, і дані будуть в нього записуватись, допоки не буде досягнуто певного розміру сегмент-файла чи не пройде максимальний час активності сегмент-файлу.

Producers

Продюсери займаються записом даних в Apache Kafka. Кожен продюсер може записувати дані в різні топіки та партиції. На стороні продюсерів також відбувається об’єднання повідомлень в пакети (batch) перед їх відправкою. Є змога налаштовувати розмір цих пакетів за допомогою параметрів batch.size та linger.ms. З погляду гарантій, що повідомлення було доставлено до брокера та записано, є налаштування acks на стороні продюсера.

Consumers

Консьюмери займаються читанням повідомлень з Apache Kafka.

Consumer Groups

Важливим елементом дизайну є консьюмер-групи (consumer groups). Кожен консьюмер є частиною конкретної групи консьюмерів, яка була вказана при створенні цього консьюмера за допомогою налаштування group.id.

З погляду топік-партиції тільки один конкретний консьюмер може читати дані з цієї партиції в межах однієї консьюмер-групи. Як наслідок якщо, для прикладу, у вас у топіку 10 партицій, то немає сенсу створювати більше 10 консьюмерів у рамках консьюмер-групи, оскільки одинадцятий простоюватиме.

Consumer Offset

Кожне повідомлення, записане в топік-партицію, має свій "ідентифікатор«/offset. Таким чином:

Коли консьюмер починає роботу, він вказує, з якого офсету хоче почати читання повідомлень з кожної топік-партиції. У випадку, якщо це нова консьюмер-группа і ще не було вичитано жодних даних з топіку, або якщо вам не важливий мануальний контроль офсетів, оскільки ви використовуєте auto commit функціональність — в цих обох сценаріях можете інструктувати, звідки ви хочете почати вичитку даних: з початку чи з кінця (якщо вас цікавлять тільки нові дані, що прийдуть) за допомогою конфігурації auto.offset.reset.

Коли консьюмер обробив повідомлення, він має повідомити брокера, надіславши йому підтвердження про це за допомогою команди commit. Брокер зберігає прочитані офсети від всіх консьюмер-груп в окремому внутрішньому топіку __consumer_offsets.

(Consumer) Message Delivery Semantics

З точки зору гарантій доставки повідомлень на стороні консьюмерів можна реалізувати всі три опції:

У випадку at most once ви можете використати autocommit функціональність, яка є з коробки в Apache Kafka консьюмер-клієнтах. За необхідності ж реалізації at least once та exactly once сценаріїв на стороні консьюмера треба буде надсилати підтвердження до брокера, що повідомлення було прочитано. Наприклад, в Java-клієнті за це відповідає команда commitSync. У варіанті з at-least-once концептуально код виглядатиме наступним чином:

// example config with disabled auto commit to enable at-least-once delivery semantics 

private static KafkaConsumer<String, String> createConsumer() {
   String consumerGroup = "your_cool_static_consumer_group_name";
   Properties props = new Properties();
   props.put("group.id", consumerGroup);
   props.put("enable.auto.commit", "false");
   // other properties ommitted for brevity
   return new KafkaConsumer<String, String>(props);
}

// example at-least-once data processing

while (true) {
   ConsumerRecords<String, String> records = consumer.poll(100); // read
   process(records); // process
   consumer.commitSync(); // commit
}

Як Kafka забезпечує масштабованість та надійність

Дизайн і архітектура Apache Kafka дозволяє використовувати її у високонавантажених системах для обробки потокових даних.

З погляду масштабованості Apache Kafka дозволяє гнучко масштабувати компоненти:

  • З’явилось нове джерело даних? Можна створити нового продюсера, який буде надсилати дані з цього джерела до Apache Kafka.
  • Брокери перевантажені (CPU/Disk/Network)? Можна збільшити їх кількість.
  • Консьюмери повільніше обробляють дані, що надходять (consumer lag)? Можна збільшити їх кількість до максимальної кількості партицій, з яких йде читання. Після цього все ще потрібна більша паралелізація обробки? Можна збільшити кількість партицій в топіку (пам’ятаючи при цьому про порушення порядку повідомлень).

З погляду надійності Apache Kafka дозволяє:

  • Продовжити роботу навіть якщо якийсь з компонентів відвалився (брокер/нода Zookeper, продюсер, консьюмер).
  • З погляду роботи з даними, оскільки:
    • вони зберігаються на диску;
    • ви можете налаштовувати їх реплікацію;
    • маєте змогу «відкатуватись» на потрібний consumer offset.
  • З огляду на це, якщо є така потреба, ці дані можуть бути знову перевичитані та оброблені у разі збою.

Роль Kafka Connect у гнучкій архітектурі

У минулій секції ми розглянули основи Apache Kafka. З погляду архітектурних підходів ця технологія завдяки розглянутим властивостям може використовуватись в подійно-орієнтованому та сервісному підходах, у потоковій обробці (стрімінгу) та монолітних рішеннях.

З іншого боку, Kafka Connect — це технологія, що дозволяє інтегруватися зі сторонніми системами:

  • вичитуючи з них дані та надсилаючи їх в Apache Kafka;
  • надсилаючи дані з Apache Kafka в інші сторонні системи.

Тобто якщо розглянути типовий streaming сценарій:

То у випадку з ним Kafka Connect може використовуватись і для Data Ingestion, і для Stream Processing/ETL-компонентів:

Архітектура Kafka Connect

Kafka Connect дозволяє налаштувати інтеграцію між Apache Kafka та іншими системами як конфігурацію, а не програмування. Завдяки цьому вона дозволяє зосередитися на логіці обробки даних, а не на «клейовому» коді для переміщення цих даних.

Workers

Kafka Connect працює незалежно від брокерів Apache Kafka і може бути розгорнутий як на одному хості (standalone mode) у вигляді окремого застосунку, так і на кількох хостах (distributed mode) для утворення розподіленого кластеру. Хост, на якому запущено Kafka Connect, називається воркером. Розгортання декількох коркерів в кластері дозволяє забезпечити масштабованість і відмовостійкість.

REST API

Кожен воркер Kafka Connect має вбудований вебсервер і може приймати запити через REST API. Для координації в кластері Kafka Connect автоматично обирає одного з воркерів як лідера — він відповідає за розподіл задач і управління станом кластера. Якщо REST API запит надходить до воркера, який не є лідером, але цей запит змінює стан кластера, він буде автоматично перенаправлений до лідера. У продакшн-середовищі зазвичай використовують балансувальник навантаження (load balancer), який розподіляє запити по REST API між усіма воркерами.

State Management

Kafka Connect зберігає свій стан в окремих топіках Apache Kafka. Їх значення можна сконфігурувати через файл connect-distributed.properties (Приклад):

bootstrap.servers=<Kafka брокери>
group.id=<kafka-connect-cluster-unique-group-id>
config.storage.topic=<topic для зберігання конфігурації конекторів>
offset.storage.topic=<topic для зберігання офсетів source конекторів>
status.storage.topic=<topic для зберігання статусів конекторів>
# інші загальні налаштування Kafka Connect кластеру

Також з точки зору Sink-конекторів є можливість налаштування dead-letter-queue топіку, у який буде записуватись повідомлення у разі неможливості його відправки в сторонню систему:

errors.tolerance = all
errors.deadletterqueue.context.headers.enable=true
errors.deadletterqueue.topic.name = <dead-letter-topic-name>

У випадку, якщо ви розгортаєте декілька Kafka Connect кластерів (наприклад, для ізоляції навантаження, ізоляцію доступу через політики безпеки, організаційну ізоляцію у випадку роботи різних команд і так далі), які працюють з одним й тим самим Apache Kafka кластером брокерів — ви також маєте розмежувати connect-distributed.properties конфігурації кожного з кластерів, інакше вони конфліктуватимуть і можуть перезаписувати стан один одного.

Security

Last but not least, Kafka Connect також має security функціональність:

  1. RBAC-модель, якщо ви використовуєте Kafka Connect у Confluent Cloud та простішу ACL-модель, якщо ви використовуєте open-source версію Kafka Connect.
  2. Можливість інтеграції зі сторонніми менеджерами секретів через функціональність ConfigProvider’ів. Open-source версія Kafka Connect «в коробці» йде лише з базовими провайдерами конфігурації, такими як файлові провайдери. Тоді як у Confluent Cloud також є провайдер на основі Secret Registry. Рідна підтримка хмарних менеджерів секретів, таких як AWS Secrets Manager, Azure Key Vault або HashiCorp Vault, не входить до open-source версії Kafka Connect. Інтеграцію з цими менеджерами секретів потрібно здійснювати за допомогою сторонніх плагінів від комʼюніті, які реалізують інтерфейс ConfigProvider і можуть бути встановлені та налаштовані окремо.
  3. Автентифікація за допомогою TLS/SSL, SASL/GSSAPI для Kerberos, SASL/SCRAM та SASL/PLAIN у випадку username/password автентифікації.

Source та Sink конектори, їхні основні налаштування

Конектори поділяються на два типи:

  • Source-конектори тягнуть дані зі сторонніх систем в Apache Kafka.
  • Sink-конектори, які навпаки тягнуть ці дані з Apache Kafka в сторонні системи.

Один і той самий тип конектора (наприклад, JdbcSourceConnector) може бути використаний кілька разів в одному кластері Kafka Connect з різними конфігураціями.

Це дозволяє, наприклад, читати дані з різних баз даних (PostgreSQL, MySQL, MSSQL тощо):

{
  "name": "jdbc-mysql-connector",
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  "connection.url": "jdbc:mysql://db-host1:3306/mydb",
  ...
}
{
  "name": "jdbc-postgres-connector",
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  "connection.url": "jdbc:postgresql://db-host2:5432/anotherdb",
  ...
}

Стосовно конфігурації самих конекторів: параметри, вказані в connect-distributed.properties, які ми розглядали вище, стосуються всіх воркерів і всієї інфраструктури Kafka Connect, а не окремих конекторів. Для конфігурації ж конекторів вже надалі використовується REST API:

curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "my-jdbc-source",
    "config": {
      "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
      "connection.url": "jdbc:postgresql://localhost:5432/mydb",
      "mode": "incrementing",
      "incrementing.column.name": "id",
      "topic.prefix": "db-",
      "tasks.max": "5"
    }
  }'

Конектори можуть додаватися у вже піднятий Kafka Connect кластер, так само як і видалятися з нього.

Tasks

Task (завдання) — це одиниця роботи, яка виконує частину логіки конектора. Вони запускаються паралельно всередині Kafka Connect для масштабування. Коли ми створюємо інстанс конектора (приклад вище), ми вказуємо максимальну кількість задач в рамках цього інстансу за допомогою tasks.max. Сам конектор (клас) реалізує метод:

List<Map<String, String>> taskConfigs(int maxTasks)

У цьому методі конектор сам вирішує, як розбити загальне завдання на maxTasks частин.

Kafka Connect розподіляє Tasks по доступним воркерам у кластері. Tasks можуть бути перезапущені, переміщені тощо — кластер сам керує цим.

З погляду виконання, якщо Task не виконався успішно, його можна перезапустити через REST API:

POST /connectors/<CONNECTOR_NAME>/tasks/<TASK_ID>/restart # перезапустити конкретний task
POST /connectors/<CONNECTOR_NAME>/restart?includeTasks=true&onlyFailed=true #перезапустити конектор та тільки задачі що завершились з помилками
POST /connectors/<CONNECTOR_NAME>/restart # перезапустити тільки конектор (наприклад після внесення змін в його конфігурації, щоб він їх підхопив)

Огляд популярних конекторів

З погляду доступних Source-конекторів є можливість інтегруватися, зокрема, з наступними джерелами даних:

  • AWS-сервісами: S3, Kinesis, SQS, CloudWatch Logs.
  • Google Cloud: Storage, Pub/Sub, Firebase.
  • Azure: Event Hubs, Blob Storage, Service Bus.
  • MongoDB, RabbitMQ, JDBC (MySQL, PostgreSQL, SQLServer), HDFS, SFTP, HTTP.

З погляду Sink-конекторів є можливість запису, зокрема, в наступні системи за допомогою Kafka Connect:

  • AWS: S3, Lambda, Redshift, DynamoDB.
  • Google Cloud: BigTable, Storage, BigQuery.
  • Azure: DataLake Storage Gen1/2, Synapse Analytics, Functions.
  • ElasticSearch, RabbitMQ, HDFS, Cassandra, Apache HBase, Redis, SFTP, HTTP.

З повним списком доступних конекторів та іншою інформацією, зокрема їх ліцензіями, можна ознайомитись тут.

Основні сутності Kafka Connect

Основними сутностями Kafka Connect, з якими працює розробник під час побудови пайплайну, є конвертори, предикати та трансформації.

Converters

Конвертер (Converter). Якщо у випадку з Apache Kafka ми говорили про сутність (De)Serializers, то Kafka Connect використовує інший механізм — Конверторів — для конвертації своїх даних в масив байт і назад.

Конвертер так само вказується як для ключа, так і для значення. Це значення можна глобально вказати для всіх конекторів через connect-distributed.properties:

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

І також потім конфігурувати при створенні конектора у випадку відмінності від глобальних налаштувань:

curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "my-sink-connector",
    "config": {
      "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
      "key.converter": "org.apache.kafka.connect.storage.StringConverter",
      "value.converter": "io.confluent.connect.avro.AvroConverter",
      "value.converter.schema.registry.url": "http://schema-registry:8081",
      ...
    }
  }'

Transformations (Single Message Transforms)

Трансформації, на відміну від конектору та конвертерів, є опціональним компонентом при побудові пайплайну. Вони дозволяють трансформувати кожне повідомлення окремо, одне за одним.

Типовими прикладами трансформацій є зміна типу даних або структури, маршрутизація повідомлення залежно від вмісту, видалення/маскування чутливої інформації.

Під час створення пайплайну ви не обмежені однією трансформацією, а навпаки можете поєднувати їх у рамках тої самої обробки.

Розглянемо приклад, де потрібно зробити декілька дій над повідомленнями, які приходять з реляційної бази даних:

  1. Замаскувати поле password.
  2. Перейменувати поле created_at в CreatedAt.
  3. Додати поле з timestamp в моменті обробки.
  4. Зробити маршрутизацію повідомлень в топіки за конвенцією cleaned.<table>.
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "mysql-users-connector",
    "config": {
      "connector.class": "io.debezium.connector.mysql.MySqlConnector",
      ...
      "transforms": "MaskPassword,RenameField,AddProcessingTime,RouteTopic",
      "transforms.MaskPassword.type": "org.apache.kafka.connect.transforms.MaskField$Value",
      "transforms.MaskPassword.fields": "password",
      "transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
      "transforms.RenameField.renames": "created_at:createdAt",
      "transforms.AddProcessingTime.type": "org.apache.kafka.connect.transforms.InsertField$Value",
      "transforms.AddProcessingTime.timestamp.field": "processed_at",
      "transforms.RouteTopic.type": "org.apache.kafka.connect.transforms.RegexRouter",
      "transforms.RouteTopic.regex": "mysql_server.app_db.(.*)",
      "transforms.RouteTopic.replacement": "cleaned.$1"
    }
  }'

Порядок виконання трансформацій буде у порядку їх згадування в полі transforms.

Список усіх трансформацій «з коробки» доступний тут.

Predicates

Предикат — це умова, подібна до if-перевірки, яка визначає, чи слід застосовувати певну трансформацію до повідомлення. Його конфігурують разом із трансформацією, і лише якщо ця умова виконується, трансформація буде застосована.

Наприклад, якщо ми хочемо виконувати трансформацію MaskPassword (як у прикладі вище) лише в тому випадку, коли повідомлення стосується сутності users:

curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "mysql-users-connector",
    "config": {
       "connector.class": "io.debezium.connector.mysql.MySqlConnector",
       "transforms": "MaskPassword,RenameField,AddProcessingTime,RouteTopic",
       ...
       "predicates": "IsUsersTable",
       "predicates.IsUsersTable.type":
       "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
       "predicates.IsUsersTable.pattern": ".*users$",
       "transforms.MaskPassword.type":
       "org.apache.kafka.connect.transforms.MaskField$Value",
       "transforms.MaskPassword.fields": "password",
       "transforms.MaskPassword.predicate": "IsUsersTable",
    }
  }'

В цьому прикладі предикат IsUsersTable перевіряє, чи назва топіка закінчується на users — тобто app_db.users, mysql_server.app_db.users тощо. Трансформація MaskPassword буде застосована тільки якщо предикат спрацює (тобто джерелом є таблиця users). Інші SMT (RenameField, AddProcessingTime, RouteTopic) застосовуються до всіх записів без умов.

Також Kafka Connect дозволяє фільтрувати повідомлення за допомогою комбінування предикатів та Filter-трансформації.

Переваги та Недоліки Kafka Connect

Серед основних переваг використання Kafka Connect можна виділити наступні:

  1. Спрощена інтеграція з зовнішніми системами. Є велика вибірка конекторів, доступних для використання з найбільш популярними джерелами та приймачами даних. Всі конектори працюють за єдиною архітектурною моделлю, що забезпечує стандартизацію інтеграцій. Всі конектори налаштовуються за допомогою JSON та конфігурації через REST API Kafka Conneсt кластеру і не вимагають написання низькорівневого коду.
  2. Моніторинг і керування. Kafka Connect має вбудовану підтримку REST API для моніторингу стану конекторів, перегляду логів, управління завданнями та діагностики.
  3. Покриття таких нефункціональних вимог як масштабованість, надійність і відмовостійкість. Розгортання воркерів у кластері дозволяє не лише обробляти більші обсяги даних (шляхом додавання воркерів), а й забезпечує безперебійну роботу, якщо частина з них вийде з ладу.

Серед недоліків та обмежень використання Kafka Connect можна виділити наступні:

  1. Архітектура вашого продукту має підходити під використання Kafka Connect. Зокрема це значить, що ви у вашому продукті вже обрали підхід, де будуєте саме потокову (streaming) обробку даних через інтеграцію з різними джерелами та приймачами даних. І для потокової обробки ваш продукт вже використовує Apache Kafka, а не якесь інше рішення.
  2. Ваші джерела даних підтримуються Kafka Connect. Kafka Connect побудований на pull-моделі отримання даних, тобто конектор сам періодично звертається до джерела. Тому якщо у вашому продукті використовуються push-based джерела (наприклад, дані надходять через WebSockets або Server-Sent Events — SSE), Kafka Connect їх не підтримує.
  3. Бізнес-логіка вашого продукту вимагає більш складної обробки даних. Це, зокрема, обмежує використання Sink-конекторів, оскільки після вичитування з Apache Kafka подальша Stream Processing/ETL-логіка часто є складнішою, ніж stateless-трансформації, які доступні через Single Message Transforms в Kafka Connect. Тому дуже часто на порятунок тут приходять такі інструменти, як Apache Spark Streaming, Apache Flink і їм подібні.
  4. Існуючі конектори мають підходити під вимоги вашого бізнесу. Тут можуть грати роль різні фактори, наприклад, існуючий конектор:
    1. не реалізовує потрібну вам семантику доставки повідомлень (at-most-once/at-least-once/exactly-once);
    2. має ліцензію, яка вам не підходить;
    3. має критичні вразливості та/або його performance не є задовільним.
  5. Написання своїх конекторів не є тривіальною задачею. Як приклад, можна подивитись на код існуючого конектора, який пише в AWS S3 S3SinkConnector та його основної бізнес логіки, яка реалізована в класах S3SinkTask та TopicPartitionWriter. Концептуально запис даних з Apache Kafka в AWS S3 здається простою задачею. Проте з погляду реалізації, як бачимо, все не так легко, щоб прикладні інженери могли успішно впровадити це для вашого продукту.

Альтернативи Kafka Connect

Серед альтернатив Kafka Connect найбільш розповсюдженими, з особистого досвіду, є використання Apache Nifi та написання своїх кастомних застосунків.

Apache Nifi має PublishKafka процесор (або його версія в Minifi), який під капотом так само використовує Apache Kafka Producer клієнта для відправки даних в Apache Kafka.

Є більш вузькі рішення, наприклад:

  • Використання Debezium окремо від Kafka Connect через Debezium-Engine, або Debezium-Server, якщо ви використовуєте не Apache Kafka, а іншу чергу повідомлень.
  • Flink CDC для реалізації Change Data Capture підходу в поєднанні з Apache Flink.

Kafka Connect доступний також від різних вендорів з різною комерційною складовою, ліцензією та сумісністю між ними:

Висновки

Отже, у цій статті ми детально розглянули архітектуру та основні сутності Apache Kafka, такі як повідомлення, топіки, партиції та сегменти, а також принципи роботи продюсерів і консьюмерів. Ми проаналізували, як Kafka забезпечує масштабованість і надійність у високонавантажених системах. Особливу увагу було приділено ролі Kafka Connect у побудові гнучких архітектур, його компонентам, включаючи воркери, конектори (Source та Sink), конвертори, трансформації та предикати. Насамкінець, ми обговорили переваги та недоліки використання Kafka Connect, а також розглянули доступні альтернативи для різних сценаріїв інтеграції даних.

Чи варто використовувати Apache Kafka разом із Kafka Connect у вашому рішенні? У поєднанні ці інструменти можуть суттєво спростити інтеграцію з зовнішніми системами й забезпечити прозору, масштабовану модель обміну даними. Втім, як це часто буває у світі ІТ у відповіді на будь-яке питання — it depends. Остаточне рішення варто приймати, враховуючи бізнес-вимоги, технічні обмеження, витрати на впровадження та підтримку, наявну експертизу в команді та інші фактори.

Мали досвід з Kafka Connect для передачі даних з Apache Kafka або тільки плануєте впровадження? Поділіться своїми думками та кейсами в коментарях — буде цікаво обговорити.

👍ПодобаєтьсяСподобалось15
До обраногоВ обраному9
LinkedIn
Дозволені теги: blockquote, a, pre, code, ul, ol, li, b, i, del.
Ctrl + Enter
Дозволені теги: blockquote, a, pre, code, ul, ol, li, b, i, del.
Ctrl + Enter
  • Чи подобається вам Кафка?
  • Так, грефяна офобливо...

Блін. Я так багато років тому останній раз чув цей жарт що спочатку довго не міг зрозуміти, у чому прикол з Grafana.

я собі на амазоні купив був 2 мішечки по 20 фунтів (здається) підварюю по троху вона полєзна полєзніша окремо взято за рис

Це суперфудом вважається. Особливо необсмажена.

І хоч би в одній статті розказали про нюанси які вилазять коли івент довго обробляється та способи вирішення цієї проблеми.
Всі з тим стикаються переважно вже у продакшині😆

Що не стаття, то переказ документації.

Дякую за зауваження.
Так, тривала обробка повідомлень — справді важливий аспект, який може створити проблеми в продакшн-середовищі, особливо якщо йдеться про виклики сторонніх сервісів, складну бізнес-логіку чи важкі трансформації.

У цьому матеріалі фокус імплементації клієнтської частини був на Kafka Connect, який переважно використовується для задач data movement і працює з lightweight stateless трансформаціями (SMT). За своїм дизайном SMT не передбачають тривалої або блокуючої обробки, тому відповідна проблема менш актуальна саме в контексті Kafka Connect.

Звісно, у випадках кастомних застосунків із комплексною обробкою подій — наприклад, коли присутній Complex Event Processing або виклики зовнішніх сервісів під час обробки — тривалість обробки стає критично важливою. У таких сценаріях справді варто приділяти більшу увагу налаштуванням kafka клієнта, паралелизму, моніторингу, і загалом тестуванню негативних сценаріїв (сервіс відвалився тощо).

Дякую, що підсвітили цей момент — можливо, окрема стаття з прикладами з продакшену була б корисною для спільноти.

У випадку at most once ви можете використати autocommit функціональність

Вибачайте, але це, мняко кажучи, неправда.

Бо в кафці взагалі, по суті, нема (гарантованого) at most once.
Бо єдине налаштування автокоміта — це за часовим інтервалом.

Тобто, якшо, припустимо, полл зчитав 10 рекордів, автокоміт стоїть на 100мс, 5 встигли пропроцесить за 70мс — а потім компонент наїбнувся.
То коли він підніметься знов — ті 5 будуть прочитані ще раз.

Таким чином, ви абсолютно ЗАВЖДИ маєте бути готовим до at least once, PERIOD.
Тобто, мати явну чи неявну дедуплікацію сутностей в вашій системі шо споживає ці мессаджі.

Kafka — це абсолютно жахлива, косойобна, на-грані-повної-зневаги-до-користувача реалізація простої і, загалом, непоганої ідеї.

Особливий сморід випромінює їхній офіційний Java client.

І навіть обгортки типу Spring Kafka не надто допомагають (бо сам Spring теж... але то інший топік).

Найбільша ***ня в тому шо альтернатив-от, м’яко кажучи, небагато.

то для чого обсирати , якшо альтернатив нема ?))

Потому что я привык презирать глупость, а не соперничать в ней. © Цицерон

Чомусь у багатьох наших співвітчизників такий підхід, що коли їх заставляють жерти гівно, то вони вважають за потрібне дякувати і нахвалювати це гівно.

Напишіть краще. Чи у вас часу на це немає, за це ніхто не заплатить, у вас своєї роботи валом, не цікаво та ще мільйон інших відмазок, які тільки можна вигадати?
Можете на-Go-внякати навіть (вибачте, не втримався, не маю претензій до вас персонально, просто вираз дуже смачний)

Так проблема ж не в «сісти на написати своє». Це реалістично і з наполегливим підходом дійсно можна зробити краще. Та тільки кому воно буде потрібно? Конфлюент побудував таку екосистему навколо Кафки, що навіть якщо Кафка остаточно перетвориться в кучу лайна, то люди ще з десяток років по інерції будуть її використовувати.

Та тільки кому воно буде потрібно?

Ви зараз серйозно? Як мінімум тим особам, яким не подобається Kafka, але поки що кращої альтернативи немає.

Конфлюент побудував таку екосистему навколо Кафки, що навіть якщо Кафка остаточно перетвориться в кучу лайна, то люди ще з десяток років по інерції будуть її використовувати.

Яка різниця що там побудував Конфлюєнт? Якщо ви зробити конкурентноздатний продукт, то у вас буде власна екосистема.

Ви наче сьогоднішній, капець. Конкурентноздатний не значить найкращий. Найкращий не значить конкурентноздатний.

Ви можете написати найкращу альтернативу Кафкі, та тільки усім буде пофіг, бо кафка — це сотні і тисячі спеціалістів, сотні годин навчальних матеріалів, сотні книг, сертифікації, аудити і тд. Бізнес просто так не працює.

Конкурентноздатний не значить найкращий. Найкращий не значить конкурентноздатний.

І які висновки ми мусимо зробити з цього філософського трактату?

Ви можете написати найкращу альтернативу Кафкі, та тільки усім буде пофіг

Ви зараз продовжуєте всім розповідати про причини, чому цього робити не треба. А треба думати про те, як зробити це можливим. Навіть коли люди наведуть вам реальні приклади, як зробити проект, а тим паче високоякісний та конкурентноспроможний, популярним, ви знайдете черговий пакет відмазок.

Навіть коли люди наведуть вам реальні приклади, як зробити проект, а тим паче високоякісний та конкурентноспроможний, популярним

Коли хоч натяк на це буде, тоді є сенс про це говорити. А поки я бачу тільки закидони з розряду «спєрва дабєйся», «сдєлай лудше» і тд., коли обʼєктивно те, що ти там для себе сам зробиш, можливо, і буде кращим, але нікому, крім тебе, не потрібним.

Лови

  • Відвідувати профільні івенти, розказувати про переваги продукта перед конкурентами
  • Популярізувати продукт на форумах, в профільних групах соцмереж
  • Написати ряд статей та опублікувати на відповідних ресурсах
  • Купити рекламу
  • Найняти професійних рекламщиків
Достатньо?

Таке гівно, що це майже стандарт в індустрії.🤦‍♂️

так і жабаскрипт, fucking abomination designed by reptiloids for reptiloids — стандарт

і це вже десятиліттями так

еммм... ну, типу, можна й так
можна навіть й AMQP брокер, казалось би

але пульсар — це більше ніж immutable distributed log (кажу ж — за кафкою дуже проста ідея)
і він складніший в експлуатації, там більше компонент і складніша модель

чи варто городити город для більшості простих кейсів, та ще й на додачу отримати гірший throughput через надмірну складність яка тобі не потрібна?

чи варто городити город для більшості простих кейсів, та ще й на додачу отримати гірший throughput через надмірну складність яка тобі не потрібна?

На рахунок складніший — згоден.

та ще й на додачу отримати гірший throughput через надмірну складність яка тобі не потрібна

На всіх бенчмарках які я гортав, пульсар швидший, чи я щось не так зрозумів?

ну конфлюєнт каже що на 200к кафка швидша

ггг

З погляду топік-партиції тільки один конкретний консьюмер може читати дані з цієї партиції в межах однієї консьюмер-групи.

Стоит упомянуть, что в 4-ой Кафке наконец-то подвезди кьюшки, так что с share group теперь можно иметь несколько консюмеров на партицию

Так, дякую за влучне зауваження. Дійсно разом з KIP-932 cwiki.apache.org/...​ueuesforKafka-Earlyaccess з’явилась нова семантика «кооперативної» вичитки повідомлень, хоч поки що ця функціональність в Early Access, і не рекомендована для продакшн середовищ.

В окремих сценаріях, де потрібна саме семантика черги повідомлень, це має дати змогу будувати більш еластичні, адаптивні системи: наприклад, в пікові часи можна динамічно додати консьюмери до групи — і вони всі отримають частину нових повідомлень з тих же партицій. Буде цікаво побачити її впровадження.

Підписатись на коментарі