Сучасна диджитал-освіта для дітей — безоплатне заняття в GoITeens ×
Mazda CX 30
×

Починаємо роботу з Apache Kafka. Частина II

Підписуйтеся на Telegram-канал «DOU #tech», щоб не пропустити нові технічні статті

Всім привіт. Я Сергій Моренець, розробник, викладач, спікер та технічний письменник. Хочу поділитися з вами своїм досвідом роботи з такою цікавою технологією як Apache Kafka та розкрити ті теми, які з нею пов’язані. У попередній статті я розповів про внутрішні особливості Kafka, її життєвий шлях і основних суперників на ринку messaging sys-tems. Якщо ви твердо вирішили використовувати Kafka у своїх додатках, то тепер потрібно ознайомитися з тим, як надсилати та отримувати повідомлення за допомогою Kafka.

У цій статті ми поговоримо про те, як надсилати повідомлення з Java-додатків. Наприклад, візьмемо абстрактну систему, побудовану на мікросервісах, в якій платіжний сервіс управляє платежами. І кожен платіж (успішний чи невдалий) повинен призводити до відправки події (нотифікації), для чого ми і будемо використовувати Kafka.

Конфігурація та деплоймент

У попередній статті ми говорили про те, що в Kafka 2.8 з’явився новий механізм роботи, в якому немає місця Zookeeper, а його роль контролера займе так званий кворум контролер на основі тієї ж Kafka. При чому, зручно для локального використання та тестування, що той самий Kafka сервер-може бути і брокером, і контролером. Оскільки цей механізм, швидше за все, стане основним, то цікавіше саме його і використовувати для розгляду.

Для запуску Kafka достатньо завантажити та запустити бінарні файли. Але якщо ви використовуєте Windows, то ви можете стикнутися з тією проблемою, що, як пише документація «Kafka is not intended to be run on windows natively and has several issues that may arise over time». Справа в тому, що Kafka використовує специфічні функції POSIX для своєї роботи. Тому в компанії Confluent прямо рекомендують використовувати або WSL 2, або Docker для запуску Kafka на Windows. Тому ми в цій статті візьмемо Docker/Docker Composе, адже вони дозволяють досить легко автоматизувати запуск і конфігурацію і підходять для будь-якої ОС.

Але якщо ви захочете використовувати новий механізм та Docker-оточення, то зіткнетеся з першою складністю. Docker-образ для Kafka містить bash-скрипти, які запускаються при старті сервісу і перевіряють коректність різних налаштувань. Одна з них — наявність спеціальної якості zookeeper.connect. До Kafka 2.8 його потрібно було обов’язково вказувати як список Zookeeper-серверів (IP-адреса та порт), до яких Kafka брокери підключалися при старті. Після 2.8 ця властивість стала опціональною, але bash-скрипти в Docker не були адаптовані до нових змін. І якщо ви його не вкажете в kafka.properties або в змінних оточення, то Kafka-брокер просто не запускатиметься. У цієї проблеми немає простого рішення, крім модифікації цих bash-скриптів на той випадок, коли ви не використовуєте Zookeeper (або написати свій Dockerfile). Тому нічого не залишається, як усунути перевірки, по суті, зробивши патч для цих скриптів. Це workaround, якого ми позбудемося, як тільки ситуацію виправлять на рівні Kafka. Аналогічна проблема існує і в тих Docker images, які надає популярний сервіс Bitnami

Таких скриптів два — configure та ensure. Скопіюємо їх зі змінами з Docker-образу Kafka в папку проєкту docker-scripts/kafka і заодно додамо Dockerfile, який буде перетирати існуючі скрипти:

 FROM confluentinc/cp-kafka:7.1.1
 
USER root
 
ADD configure /etc/confluent/docker/configure
ADD ensure /etc/confluent/docker/ensure
RUN chmod -R 777 /etc/confluent/docker/
 RUN echo "kafka-storage format --ignore-formatted -t $(kafka-storage random-uuid) -c /etc/kafka/kafka.properties" >> /etc/confluent/docker/ensure

Як ви бачите, ми використовуємо образи Docker від компанії Confluent. cp-kafka — це образ для безкоштовного використання, а cp-server — для комерційного, тому ми використовуємо перший. Крім того, в цьому Dockerfile ми виконуємо команду chmod, щоб дозволити будь-яким процесам доступ до нових версій файлів.

Ще один необхідний додаток — виконання команди kafka-storage з аргументами format та random-uuid. Річ у тім, що новий протокол Kafka Raft вимагає наявність властивості cluster.id. Вона, по-перше, має бути унікальною (UUID), по-друге, одним і тим самим для кожного вузла в кластері. Тому спочатку виконуємо команду kafka-storage random-uuid для того, щоб згенерувати ідентифікатор, а потім помістити його в конфігураційний файл kafka.properties за допомогою команди kafka-storage format. Лістинг bash-скриптів від Confluent можна подивитися в їх репозиторії.

Тепер можна додати новий сервіс у docker-compose.yml:

  kafka:
    build:
      context: docker-scripts/kafka
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka:9092'
      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:9093'
      KAFKA_LISTENERS: 'PLAINTEXT://kafka:9092,CONTROLLER://kafka:9093'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'

Розберемо в деталях його конфігурацію. Щодо змінних оточення, то, як ви бачите, всі вони починаються з префікса KAFKA_. Уся конфігурація Apache Kafka зберігається на сервері у файлі server.properties (або kafka.properties). Але модифікувати такий файл через Dockerfile не дуже зручно. Тому для Docker-образу для кожної властивості створена відповідна змінна оточення з префіксом KAFKA_, в якій всі символи переведені у верхній регістр, а точки замінені на символ підкреслення. Навіщо потрібні ці налаштування? Їх можна умовно поділити на дві групи: базові та ті, що з’явилися у версії 2.8. Почнемо з другої групи:

  • node.id — спеціальний ідентифікатор вузла у кластері, який потрібно обов’язково вказувати для використання у протоколі KRaft. На зміну прийшов id. При цьому в нашому випадку у нас буде два віртуальні вузли, так як наш сервер є і брокером, і контролером.
  • process.roles — роль(або ролі), які виконує наш сервер. Якщо їх не вказати, то роль контролера буде виконувати Zookeeper.
  • controller.listener.names — оскільки наш сервер є ще й контролером, то потрібно вказати тут його назву (або назви), які будуть використовуватися в інших властивостях (CONTROLLER).
  • controller.quorum.voters — відповідність (map), що містить ідентифікатори та адреси контролерів, які відповідають за керування кластером.

Тепер перейдемо до базових властивостей:

  • listeners — це найважливіша властивість, де вказується, яке ім’я хоста та порт ми будемо використовувати для зв’язку з іншими Kafka-серверами та клієнтами. Оскільки ми вказали дві ролі, то нам потрібні вже два порти для роботи. При цьому ми не використовуємо шифрування, тому вказали протокол PLAINTEXT
  • advertised.listeners — ця властивість схожа на listeners, але його використовують лише клієнтські програми. Необхідність у новій властивості обумовлена тим, що клієнти можуть бути поза Kafka-кластером, за межами firewall, тому їм іноді необхідно вказувати, наприклад, адресу проксі-сервера для успішного доступу.
  • listener.security.protocol.map — відповідність (map) між назвами listeners і протоколом комунікації (ми використовуємо тільки PLAINTEXT). Тому в цьому списку лише два елементи: для брокера, і для контролера.

Крім того, ми вказали папку docker_scripts/kafka для збiрки, саме там має знаходитися Dockerfile і bash-скрипти ensure/configure.

Як ви бачите, заміна Zookeeper на кворум контролер виявилася внутрішньою зміною в самій Kafka, зовнішні клієнти ніяк не спілкуються з ними. Також додамо Akhq контейнер, про нього я розповідав в одній із попередніх статей. Головна його функція — візуалізувати дані та конфігурацію в Kafka кластері, щоб не залежати від клієнта командного рядка:

  akhq:
    image: tchiotludo/akhq
    environment:
      AKHQ_CONFIGURATION: |
        akhq:
          connections:
            docker-kafka-server:
              properties:
                bootstrap.servers: "kafka:9092"
    ports:
      - 8081:8080
      - 28081:28081

Збираємо образи Docker, запускаємо контейнери, Apache Kafka стартує успішно, а за адресою localhost:8081 можна подивитися на те, як налаштований наш Kafka-кластер.

Kafka та Micronaut

Для надсилання повідомлень скористаємося таким перспективним мікро веб-фреймврком як Micronaut. Apache Kafka має свій офіційний Java-клієнт, який можна використовувати як для отримання, так і для відправлення повідомлень. Будь-яка технологія Java, яка призначена для роботи з Kafka, обов’язково буде його використовувати під капотом. Це стосується і Micronaut, Jakarta EE, і Spring Boot.

Micronaut має окремий проєкт Micronaut Kafka для інтеграції з цією messaging system. Додамо цю залежність до нашого проєкту (Maven або Gradle).

              <dependency>
                     <groupId>io.micronaut.kafka</groupId>
                     <artifactId>micronaut-kafka</artifactId>
              </dependency>
    implementation("io.micronaut.kafka:micronaut-kafka")

У сучасних системах переважає декларативний підхід до роботи із зовнішніми компонентами. Можливо тому в Micronaut вирішили застосувати патерн декларативний клієнт, коли ми вказуємо тільки контракт на ту функціональність, яка нам потрібна, а вся реалізація створюється в run-time.

Тому достатньо додати лише інтерфейс PaymentClient:

@KafkaClient(id="payment")
@Topic("payments")
public interface PaymentClient {
 
    void send(@KafkaKey Object key, BaseEvent<?> value);
}

Цей проксі-інтерфейс відповідатиме лише за те, щоб відправляти повідомлення в topic pay-ments. Тому ми додали анотацію @Topic("payments") як частину конфігурації Micronaut. Як Micronaut дізнається, що для нього необхідно на льоту створити клас-реалізацію? Завдяки анотації @KafkaClient, яка перетворює його на бін, доступний для впровадження.

Метод send містить два аргументи:

  • key — ключ повідомлення (ідентифікований за допомогою анотації @KafkaKey)
  • value — значення повідомлення

Ми спеціально оголосили тип ключа як Object для того, щоб не прив’язуватися до конкретного типу ключа, тим більше що Kafka це і не потрібно. А значення завжди має бути спадкоємцем від класу BaseEvent (базового для всіх подій)::

@Getter
@NoArgsConstructor
public abstract class BaseEvent<T> {
      
       private String id;
      
       private String entityId;
      
       private String type;
      
       private String source;
      
       private LocalDateTime createdAt;
      
       private T payload;
 
       public BaseEvent(String entityId, String type, String source, T payload) {
              this.entityId = entityId;
              this.type = type.toLowerCase().replaceAll("_", ".");
              this.source = source;
              this.payload = payload;
              createdAt = LocalDateTime.now();
              id = UUID.randomUUID().toString();
       }
}

Таким чином, KafkaClient — це бін загального призначення для надсилання будь-яких повідомлень у topic payments.

Але ще потрібно вказати адресу Kafka брокера. Якщо ще раз подивитись конфігурацію Kafka в docker-compose.yml, то там як адреса вказано kafka:9092. Це і є те значення, яке необхідно помістити у спеціальну властивість kafka.bootstrap.servers. Додамо цю властивість у application.properties:

kafka.bootstrap.servers=kafka:9092

Тільки тепер Micronaut запустить автоконфігурацію і завантажуватиме конфігурацію для Kafka-клієнта. PaymentClient буде використовуватися в класі PaymentService для відправки події після відповіді від платіжного провайдера наступним чином (тут payment — це платіж, який був відправлений в платіжну систему):

PaymentDTO paymentDTO = transformer.transform(payment, PaymentDTO.class);
BaseEvent<?> event = result.isSuccess()
       ? new PaymentSuccessEvent(payment.getId().toHexString(), source, paymentDTO)
       : new PaymentFailureEvent(payment.getId().toHexString(), source, paymentDTO);
CompletableFuture<RecordMetadata> future = paymentClient.send(payment.getOrderId(), event);
future.thenAcceptAsync(metadata -> log.debug("Event delivered with partition {} and offset {}",        metadata.partition(), metadata.offset()));

А клас PaymentSuccessEvent — це спадкоємець від BaseEvent:

@NoArgsConstructor
public class PaymentSuccessEvent extends BaseEvent<PaymentDTO> {
 
       public PaymentSuccessEvent(String entityId, String source,
                     PaymentDTO payload) {
              super(entityId, PaymentEventType.PAYMENT_SUCCESS.name(), source, payload);
       }
}

Але тепер наші інтеграційні тести для PaymentService почнуть падати, тому що при запуску тестів Kafka сервер не доступний. Тому необхідно оголосити в тестах PaymentClient mock-біном.

       @MockBean(PaymentClient.class)
       PaymentClient paymentClient() {
              return Mockito.mock(PaymentClient.class);
       }

Тепер перевіримо, чи справді повідомлення потрапляють до Kafka. Створимо нове замовлення, оплатимо його і зайдемо в Akhq UI: localhost:8081 Відразу видно, що topic payments створився:

І можна переглянути вміст значення повідомлення. Akhq автоматично визначив його тип (JSON). Цікаво, що ми ніде не вказували цей формат. Він був за замовчуванням обраний Micronaut.

Якщо тепер заглянути в лог для Kafka контейнера, то там якраз і вказується процес створення topic’a payments, причому він створюється автоматично, при першій відправці повідомлення. Кількість розділів для topic’a 1 — це значення за замовчуванням:

INFO [Controller 1] CreateTopics result(s): CreatableTopic(name=’payments’, numPartitions=1, replicationFactor=1, assignments=[], configs=[]): SUCCESS (org.apache.kafka.controller.ReplicationControlManager)

INFO [Controller 1] Created topic payments with topic ID gRyr3y4OTeupFjYq0RekDw. (org.apache.kafka.controller.ReplicationControlManager)

Висновки

У другій частині ми створили Kafka-конфігурацію для запуску через Docker Compose та успішно відправили нотифікацію з Micronaut-додатка. При цьому ми не використовували Zookeeper, а налаштували єдиний Kafka-сервер і як брокер, і як активний контролер. Мануальне тестування не виявило жодних проблем. Так само і Akhq-клієнт без будь-яких складнощів зміг підключитися до Kafka-брокеру з новою конфігурацією. У наступному розділі ми продовжимо знайомитися з розробкою додатків на базі Kafka.

👍ПодобаєтьсяСподобалось16
До обраногоВ обраному4
LinkedIn
Дозволені теги: blockquote, a, pre, code, ul, ol, li, b, i, del.
Ctrl + Enter
Дозволені теги: blockquote, a, pre, code, ul, ol, li, b, i, del.
Ctrl + Enter

З UI для кафки рекомендую provectus/kafka-ui — найкраще з того, що пробував

Не впевнений чи краще, бо не пробував akhq. Але мені особисто подобається і серед того, що пробував однозначно топ

Ми на Cruise Control переходимо, хоча це більше для менеджменту кластерів, ніж для UI . Зараз взагалі Grafana в якості UI

Підписатись на коментарі