Сучасна диджитал-освіта для дітей — безоплатне заняття в GoITeens ×
Mazda CX 5
×

Починаємо роботу з Apache Kafka. Частина III

Підписуйтеся на Telegram-канал «DOU #tech», щоб не пропустити нові технічні статті

Всім привіт. Я Сергій Моренець, розробник, викладач, спікер та технічний письменник. Хочу поділитися своїм досвідом роботи з такою цікавою технологією як Apache Kafka та розкрити ті теми, що з нею пов’язані. У попередніх статтях я розповів про внутрішні особливості Kafka та конфігурацію та деплоймент відправників повідомлень. Якщо ви твердо вирішили використовувати Kafka у своїх додатках, то тепер потрібно ознайомитися з тим, як тестувати Kafka додатки, а також детальніше торкнемося конфігурації відправника. Крім того, ми розглядаємо цей матеріал на деяких тренінгах. Наприклад, візьмем абстрактну систему, побудовану на мікросервісах, у якій платіжний сервіс управляє платежами. І кожен платіж (успішний чи невдалий) повинен призводити до відправки події (нотифікації), для чого ми будемо використовувати Kafka.

Тестуємо Kafka додатки

У минулій статті ми додали можливість надсилати повідомлення з платіжного сервісу, але нам не вистачає однієї важливої речі, щоб вважати своє завдання закінченим. Це, зрозуміло, інтеграційні тести тих компонентів, де використовується @KafkaClient. У минулому розділі нам довелося мокувати PaymentClient скрізь, де він використовувався.

На щастя, розробники Kafka теж розуміли важливість цього моменту, тому створили проєкт Embedded Kafka, завдяки якому ви можете запускати спеціальний in-memory сервер, який емулює роботу Kafka.

Цей проєкт з’явився у 2015 році, тобто практично є ровесником Kafka. Він прийшов на зміну іншій бібліотеці kafka-unit, яка не підтримується з 2019 року. На відміну від неї, Embedded Kafka регулярно випускає оновлення, і ось зовсім недавно вийшла підтримка Kafka 3.2.1. Цікаво, що якщо Kafka написана на Java та Scala, то Embedded Kafka можна розміщувати стікер «100% Scala» у своєму GitHub-репозиторії.

Нам нічого не заважає його використовувати, тим більше, що він забезпечує максимальну продуктивність у тестах. Крім того, не потрібно особливої конфігурації Kafka-брокерів, тому що весь проєкт розгортається локально. Але, з іншого боку, як і у випадку з іншим популярним проєктом Embedded Mongo, тут є свої мінуси:

  • Ми не можемо гарантувати, що наш код буде працювати на production так само, як і в тестах.
  • Якщо Java клієнт для Apache Kafka випускається для кожної нової версії Kafka (наприклад, 2.6.1, 2.6.2, 2.6.3), то Embedded Kafka випускають для основних версій Kafka, i ви можете використовувати або 2.6.0 або 2.7.0.
  • Embedded Kafka — це вузькоспеціалізоване рішення універсальної проблеми (написання інтеграційних тестів, які використовують зовнішні сервери).

Зрозуміло, цікавіше застосувати універсальне рішення для будь-якої ситуації. Це і є альтернатива Embedded Kafka — бібліотека TestContainers, яку ми розглянемо. Якщо ви ще не знайомі з нею, то в цілому, це обгортка навколо Java клієнта для Docker, яка дозволяє запускати та налаштовувати Docker-контейнери у ваших тестах. До того ж, вона підтримує не тільки Docker, але і Docker Compose. Більше того, вона вже портована на найпопулярніші платформи, включаючи .NET, Python, Node.JS, Rust та багато інших.

Можна було б використовувати готову залежність org.testscontainers:kafka, де є спеціальний клас KafkaContainer, який забезпечує базову конфігурацію, достатню для тестування (один кластер з одного брокера). Але, на жаль, він все ще зав’язаний на Zookeeper і є певні складнощі в тому, щоб змусити його використовувати новий режим, де Zookeeper відсутний. Тому ми будемо використовувати стандартний клас GenericContainer, який може працювати з будь-яким контейнером Docker.

Додамо базовий клас для всіх інтеграційних тестів:

@MicronautTest
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@Testcontainers
public abstract class BaseIntegrationTest implements TestPropertyProvider {
}

Тут вказано три ключові анотації:

  • @MicronautTest — інтеграція Micronaut і JUnit
  • @TestContainers — інтеграція TestContainers і Junit
  • @TestInstance(TestInstance.Lifecycle.PER_CLASS) — декларує, що ми створюємо об’єкт тестового класу для всіх тестів (а не об’єкт для кожного тесту)

Тепер додамо тестовий клас для PaymentService:

 public class PaymentServiceTest extends BaseIntegrationTest {
 
       static final int KAFKA_LOCAL_PORT;
 
       static final GenericContainer<?> kafka;
 
       static {
              kafka = new GenericContainer<>(DockerImageName.parse("confluentinc/cp-kafka:7.1.1"));
              kafka.start();
              KAFKA_LOCAL_PORT = kafka.getMappedPort(KafkaContainer.KAFKA_PORT);
       }
 
       @Override
       public Map<String, String> getProperties() {
              Map<String, String> properties = new HashMap<>(super.getProperties());
              properties.put("KAFKA_PORT", String.valueOf(KAFKA_LOCAL_PORT));
 
              return properties;
       }

GenericContainer — це обгортка над Docker-контейнером, який ми можемо запустити вручну (метод start()) або декларативно (додавши анотацію @Container). Запущені контейнери автоматично зупиняються та видаляються після завершення всіх тестів. Та все би добре, але ми заздалегідь не знаємо той локальний порт з Kafka-контейнера, котрий буде використаний для port mapping. Він буде обраний випадковим чином. Тому потрібно вийняти його з GenericContainer на початку тесту за допомогою методу getMappedPort і потім підсунути Kafka клієнту.

Тепер додамо в тестовий файл application.yml конфігурацію для Kafka bootstrap servers і введемо властивість KAFKA_PORT, яку ми перевизначили у методі getProperties:

kafka:
  bootstrap:
    servers: localhost:${KAFKA_PORT}

Але навіть у такій неповній конфігурації (а ми ще не додали деякі необхідні перемінні оточення) ми отримуємо помилку в самому ядрі TestContainers при спробі запустити тест:

[testcontainers-wait-0] WARN org.testcontainers.containers.wait.internal.InternalCommandPortListeningCheck — An exception while executing the internal check: Container.ExecResult(exitCode=137, stdout=, stderr=Ncat: Version 7.70 ( nmap.org/ncat )

Ncat: Connection to 127.0.0.1 failed: Connection refused.

Ncat: Trying next address...

Ncat: Cannot assign requested address.

/bin/bash: connect: Cannot assign requested address

/bin/bash: /dev/tcp/localhost/9092: Cannot assign requested address

)

[main] INFO 🐳 [confluentinc/cp-kafka:7.1.1] — Container confluentinc/cp-kafka:7.1.1 started in PT0.6145648S

Детальне дослідження показує, що таку проблему зафіксовано ще торік і досі не вирішено. Єдиний можливий варіант — відкотитись у тестах на версію Docker образу 6.2.1, останню, яка є робочою. Але ця версія використовує Kafka більш ранньої версії, ніж 2.8. А це означає, що нам все ж таки доведеться використовувати Zookeeper і простіше все ж таки використовувати для цього готовий клас KafkaContainer. Додамо необхідні залежності

               <dependency>
                     <groupId>org.testcontainers</groupId>
                     <artifactId>kafka</artifactId>
                     <scope>test</scope>
              </dependency>
    testImplementation("org.testcontainers:kafka")   

Тепер ключовий код для запуску контейнера Kafka буде виглядати наступним чином:

       static {
              kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"));
              kafka.start();
              KAFKA_LOCAL_PORT = kafka.getMappedPort(KafkaContainer.KAFKA_PORT);
       }

Запускаємо тести. Усі тести у платіжному сервісі виконуються успішно. Повністю тестовий клас виглядає так:

 public class PaymentServiceTest extends BaseIntegrationTest {
        static final int KAFKA_LOCAL_PORT;
 
       static final KafkaContainer kafka;
 
       @Inject
       PaymentService paymentService;
 
       @Inject
       PaymentRepository paymentRepository;
 
       static {
              kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"));
              kafka.start();
              KAFKA_LOCAL_PORT = kafka.getMappedPort(KafkaContainer.KAFKA_PORT);
       }
 
       @Test
       void save_validPayment_success() {
              Payment payment = new Payment();
              payment.setAmount(100);
              payment.setPaymentType(PaymentType.CHECKOUT);
              payment.setOrderId(11);
              payment.setProvder("Paypal");
              payment.setCreatedBy("100");
 
              String paymentId = paymentService.create(payment);
              assertNotNull(paymentId);
 
              Payment payment3 = paymentRepository.findById(paymentId);
              assertNotNull(payment3.getCreatedAt());
              assertTrue(payment3.isSuccess());
              assertNull(payment3.getErrorDescription());
       }
 
       @Override
       public Map<String, String> getProperties() {
              Map<String, String> properties = new HashMap<>(super.getProperties());
              properties.put("KAFKA_PORT", String.valueOf(KAFKA_LOCAL_PORT));
 
              return properties;
       }
}

Цей тест був спрощений і в ньому немає перевірки того, що повідомлення справді потрапили в потрібний topic. Ми розглянемо ці перевірки у наступній статті.

Kafka producer configuration

Тепер давайте поговоримо більш докладно про те, як взагалі відбувається процес відправлення повідомлень. Для цього ще раз розглянемо інтерфейс PaymentClient:

@KafkaClient(id="payment")
@Topic("payments")
public interface PaymentClient {
 
    void send(@KafkaKey Object key, BaseEvent<?> value);
}

Ми використовуємо метод send у DefaultPaymentService:

if (result.isSuccess()) {
       paymentClient.send(payment.getOrderId(),
              new PaymentSuccessEvent(payment.getId().toHexString(), source, paymentDTO));
} else {
       paymentClient.send(payment.getOrderId(),
              new PaymentFailureEvent(payment.getId().toHexString(), source, paymentDTO));
}

І, зрозуміло, під час аналізу цього коду виникає закономірне питання. Що відбувається з повідомленням після виконання методу send ()? Повідомлення тільки надіслано на сервер? Або вона вже доставлена з відповідним підтвердженням?

Вивчаючи Java Core, постійно стикаєшся з тим, що при надсиланні даних через мережу (Java I/O) або якісь зовнішні канали вкрай небажано відправляти дані по одному запису (об’єкту), як тільки ми їх отримаємо від клієнта. Саме тому рекомендується завжди використовувати, наприклад, BufferedOutputStream, а не FileOutputStream безпосередньо. Тимчасові витрати на надсилання одного повідомлення надто великі, щоб ми відправляли їх по одному.

Аналогічно і з Kafka. Коли ми надсилаємо повідомлення для відправки, воно проходить через 7 стадій:

  • Серіалізація ключа (в масив байт), якщо він присутній.
  • Серіалізація значення.
  • Визначення індексу розділу (якщо їх кількість більша за один).
  • Визначення статусу поточної транзакції.
  • Приміщення запису у внутрішній буфер і формування групи повідомлень (batch) для надсилання.
  • Надсилання повідомлення на сервер.
  • Отримання метаданих про доставку.

Тобто навіть після виконання методу send() може перебувати в будь-якому внутрішньому буфері і в разі збою додатка не буде доставлено. Це дуже важливий момент. Але що, якщо повідомлення було надіслано, але якась причина не була прийнята брокером (наприклад, проблеми з мережею або перевантаженість сервера)? Kafka призначена не тільки для швидкого, але й для надійного відправлення даних. У попередніх статтях ми познайомилися з властивостями Kafka сервера, коли додавали його до конфігурації Docker Compose. Але і для клієнтської частини є чимало властивостей, які допомагають підвищити надійність комунікації. Насамперед, ця кількість повторів:

kafka:
  producers:
    default:
      retries: 3

За замовченням, у разі невдачі, Kafka клієнт спробує надіслати повідомлення ще тричі і лише потім видасть помилку. Але навіть якщо повідомлення успішно доставлено до брокера, це не означає, що воно синхронізоване з іншими репліками. У Kafka клієнта (а точніше, у класі ProducerConfig) є ще одна цікава властивість — acks:

kafka:
  producers:
    default:
      acks: 0

Воно може набувати трьох значень:

  • 0 — ми не чекаємо на підтвердження від сервера;
  • 1 — ми чекаємо на підтвердження, що сервер успішно зберіг наше повідомлення;
  • -1 — ми чекаємо на підтвердження, що сервер переслав повідомлення всім реплікам.

Вибираючи між цими трьома значеннями, ви визначаєте, що вам важливіше — швидкість (0) чи надійність відправки (-1).

У цьому блоці конфігурації цікавим є слово «default». Це означає, що це значення буде застосовано для всіх @KafkaClient. Якщо ми хочемо змінити якусь властивість лише одного @KafkaClient, необхідно вказати не default, а значення атрибута id.

Таким чином, ми вже зрозуміли, що виконання методу send() не означає, що повідомлення почало відправлятися. Але що, якщо нам потрібно в коді отримати нотифікацію про успішну відправку та залогувати номер розділу або offset повідомлення? У Kafka клієнта є спеціальний клас RecordMetadata, який містить повну відповідь від сервера. Але повертати його прямо з методу не дуже добре:

public interface PaymentClient {
     RecordMetadata send(@KafkaKey Object key, BaseEvent<?> value);
}

Тому що ми блокуємо поточний потік, і це не дуже добре позначиться на продуктивності. Як вихід — використовувати реактивний підхід, тим більше що Micronaut підтримує відразу три варіанти рішення:

  • За допомогою RxJava 2.
  • За допомогою Reactor.
  • За допомогою JDK (CompletableFuture).

Оскільки у нас немає ні першої, ні другої бібліотеки в платіжному сервісі, то нам доведеться використовувати такий перевірений механізм як CompletableFuture з Java 8:

public interface PaymentClient {
     CompletableFuture<RecordMetadata> send(@KafkaKey Object key, BaseEvent<?> value);
}

Після цього ми можемо в DefaultPaymentService вивести в лог метадані про відправлене повідомлення, при чому вони будуть виведені тільки після отримання підтвердження від сервера:

 BaseEvent<?> event = result.isSuccess()
       ? new PaymentSuccessEvent(payment.getId().toHexString(), source, paymentDTO)
       : new PaymentFailureEvent(payment.getId().toHexString(), source, paymentDTO);
CompletableFuture<RecordMetadata> future = paymentClient.send(payment.getOrderId(), event);
future.thenAcceptAsync(metadata -> log.debug("Event delivered with partition {} and offset {}",        metadata.partition(), metadata.offset()));

У цьому підході є одна дуже тонка каверза. За конвенцією, якщо метод повертає Future (CompletableFuture), це означає, що він не блокує. Однак за певних умов виконання методу send може блокуватись, наприклад, якщо відправник не зміг отримати метадані про розділ або черга повідомлень переповнена. Обговорення цієї проблеми триває з 2016 року, і розробники Kafka все ще не дійшли остаточного рішення. Проте про це варто пам’ятати.

Запускаємо тести, усі тести проходять успішно. Ще одна важлива деталь — серіалізація. Micronaut за замовчуванням використовує серіалізацію об’єкта у формат JSON. Але чому? У Kafka клієнті є два ключові інтерфейси для серіалізації — Serializer та Deserializer. І там є вбудовані реалізації для примітивних типів і рядків. Але не Java об’єктів. Тому будь-який проєкт, який хоче інтеграцію з Kafka, має її надати. І в Micronaut Kafka є проста реалізація цих інтерфейсів, що під капотом використовує бібліотеку Jackson:

@Prototype
public class JsonSerde<T> implements Serializer<T>, Deserializer<T>, Serde<T> {
 
    private final JacksonObjectSerializer objectSerializer;
    private final Class<T> type;

Вона вибирається за замовчуванням у біні CompositeSerdeRegistry, якщо ключ або значення повідомлення не є примітивним об’єктом. Але якщо стандартна реалізація вам не підходить, то можна її замінити двома способами:

  • Перевизначити цей бін і підсунути свою реалізацію (наприклад, Protobuf).
  • В application.yml вказати конкретний клас для @KafkaClient.

Висновки

Отже, у цій статті ми додали інфраструктуру для інтеграційного тестування компонентів, які використовують Apache Kafka. На жаль, бібліотека TestContainers не підтримує останні версії Kafka та останні версії Docker образу для неї, що створило нам певні труднощі при тестуванні. Будемо сподіватися на якнайшвидше вирішення проблеми, щоб можна було в тестах використовувати ту ж версію Kafka, що і на production. Також ми розглянули деякі налаштування відправника повідомлень і, в цілому, весь процес відправки, починаючи від створення повідомлення до його отримання брокером.

👍ПодобаєтьсяСподобалось8
До обраногоВ обраному6
LinkedIn
Дозволені теги: blockquote, a, pre, code, ul, ol, li, b, i, del.
Ctrl + Enter
Дозволені теги: blockquote, a, pre, code, ul, ol, li, b, i, del.
Ctrl + Enter

Підписатись на коментарі