Не брокерами єдиними. Нетривіальні методи обміну повідомленнями в розподілених системах

💡 Усі статті, обговорення, новини про DevOps — в одному місці. Приєднуйтесь до DevOps спільноти!

Привіт, я Артем Дорохін, solution architect at Luxoft. Вирішив написати декілька статей, в яких розгляну деякі нестандартні випадки та рішення на проєктах, в яких я брав участь.

Контекст

Багато з тих, хто займається дизайном систем, знайомі з ситуацією, коли для якогось конкретного випадку/задачі/фічі дуже добре лягає використання черги (queue або topic). Але її додавання в архітектуру, впровадження і підтримка пов’язані зі значними труднощами, що можуть перекрити всі переваги. Або неможливі з різного роду причин (організаційні, бізнес/фінансові, технологічні й так далі).

Для початку хотілось би уточнити, що в цьому випадку мова НЕ йде про критичні задачі або задачі, де передбачається велике навантаження на цей елемент системи (тобто якщо задача критична у рамках продукту або можливе велике навантаження, — впроваджувати чергу як окремий елемент системи все-таки необхідно).

Нижче мова піде про альтернативні інструменти, які можна використати для вирішення подібного роду технічних задач, і як вони використовувались в одному з продуктів, на якому я працював.

Case

Є сервіс, написаний на Java/SpringBoot, база даних PostgreSQL, сервіс розгортається в Kubernetes-кластері з деякою кількістю реплік (для даної задачі кількість не є вирішальною, але для розуміння масштабу це 2-20 залежно від навантаження та оточення/енвайроменту (DEV/TEST/AC/PROD і так далі), залежно від ситуації репліки можуть додаватись або видалятись.

👇 А ви вже чули, що 21 червня DOU Mobile Day?

Запити користувачів циклічно (round robin) розподіляються по репліках, тобто запити від одного користувача в рамках однієї взаємодії з системою можуть випадково потрапити на будь-який інстанс сервіса.

Беручи до уваги вищезгадану структуру, для покращення взаємодії з користувачами перед нами постала задача розробки механізму для швидкої (realtime) нотифікації між репліками.

В цьому випадку я хотів би уточнити, що мова іде саме про нотифікації, тобто легкі (lightweight) повідомлення (< 200 символів), що ініціюються як реакція на певні дії користувачів.

Звичайно, першою ідеєю було впровадити який-небудь topic(s), на який підписаний кожен інстанс сервіса в кластері та слухає відповідні події з подальшою реакцією. Але як забезпечити роботу таких нотифікацій між потенційно непостійною кількістю інстансів, які можуть динамічно додаватись і/або видалятись? І очевидною відповіддю на це питання було розуміння, що цей механізм має бути окремим елементом системи, про якого «знає» кожна репліка сервіса.

Класичне рішення — використати один з існуючих або додати message broker мало типові недоліки додавання нового елемента в систему, пов’язане з подальшими розгортанням, інтеграцією, підтримкою (на рівні коду), security, observability, maintenance (як окремого елемента системи) і тому подібними речами, які роблять складні системи складними. Це не дуже подобалося, а також проти такого класичного підходу було те, що сам по собі цей механізм не був критичним для бізнес-цілей системи.

PostgreSQL

Після деяких пошуків потенційне рішення було знайдено. Виявилось, що з обмеженнями (які ми обговоримо нижче) механізм publish/subscribe реалізований в PostgreSQL (www.postgresql.org/...​s/current/sql-notify.html www.postgresql.org/...​s/current/sql-listen.html). Працює дана реалізація наступним чином:

  • Отримувач для отримання нотифікацій виконує «SQL» команду LISTEN <channel-name>; і як результат її виконання буде отримувати всі повідомлення, що будуть надіслані на даний канал. Важливо, що після повернення результату (повідомлення) з’єднання з базою даних не закривається і продовжує блокуватись аж до моменту його явного закриття (або розриву у випадку помилки мережі й так далі).
  • Для відправки повідомлення надсилач має виконати «SQL» команду NOTIFY <channel-name> <MESSAGE>; після чого команда вважається виконаною (ні, не буде підтверджень доставки, ретраїв і тому подібних плюшок). Розмір повідомлення обмежений 8000 байт.
  • Після надсилання всі отримувачі, які в даний момент виконують команду LISTEN з указаною назвою каналу, отримають повідомлення, що було відправлене командою NOTIFY.
  • Є обмежені можливості зрозуміти, що відбувається з відправниками/отримувачами за допомогою внутрішньої таблиці/в’ю pg_stat_activity, яка відображає поточний стан отримувачів, тобто які клієнти активно слухають які канали. І відправників, що є недобрим знаком, оскільки команда NOTIFY має виконуватись швидко (< 1ms). Тобто те, що повідомлення довго надсилається, є індикатором того, що щось пішло не за планом.
  • Також є обмежені можливості подивитись, які канали зараз використовуються за допомогою внутрішньої таблиці/в’ю pg_listening_channels.
  • Можна подивитись міру заповнення «черги» за допомогою команди SELECT pg_notification_queue_usage(); що поверне число від 0 до 1 зі ступінню заповненості місця в черзі (для всіх каналів), де 0 — це пуста черга, а 1 — повна. Розмір черги можна налаштовувати за допомогою конфігураційної змінної max_notify_queue_pages (postgresql.org/...​UC-MAX-NOTIFY-QUEUE-PAGES).

В підсумку можна сказати, що все максимально просто і без жодних надмірностей, для самураїв. Код на Java/Spring також виглядає доволі просто і лаконічно.

Надсилач:

@Component
public class Notifier {

   private final JdbcTemplate jdbcTemplate;

   public Notifier(JdbcTemplate jdbcTemplate) {
       this.jdbcTemplate = jdbcTemplate;
   }

   @NotNull
   public void notify(String channel, String message) {
       jdbcTemplate.execute(createQuery(channel, message));
   }

   static String createQuery(String channel, String message) {
       return "NOTIFY %s '%s';".formatted(channel, message);
   }
}

Отримувач дещо складніше:

// component that handles notification
@Component
public class NotificationListener implements Consumer<PGNotification> {
   // other code
   private DataSource datasource;
   private PgConnection connection;
   @Value("${org.nda.flag:true}")
   private Boolean enabled;
   @Value("${org.nda.channel}")
   private String channel;

   public NotificationListener(DataSource datasource) {
       this.datasource = datasource;
   }

   // Handle notifications according to application logic
   @Override
   public void accept(PGNotification notification) {
       String message = notification.getParameter();
       // other code
   }

   /*
   run query
    */
   @PostConstruct
   public void init() {
       if (enabled) {
           connection = getConnection(this.datasource);
           connection.createStatement().execute("LISTEN " + channel);
       }
   }

   // gracefully close connection in the end
   @PreDestroy
   public void destroy() {
       connection.close();
   }

   // get the original postgres connection
   static PgConnection getConnection(DataSource ds) throws Exception {
       return ds.getConnection().unwrap(PgConnection.class);
   }

   /*
    runnable instance that will in endless loop listen notifications
    */
   public Runnable createListener() {
       return () -> {
           while(!Thread.currentThread().isInterrupted()) {
               PGNotification[] notifications =
                       connection.getNotifications();
               Arrays.stream(notifications).forEach(this::accept);
           }
       };
   }
}


// Configuration class
@Configuration
public class NotificationConfiguration {

   @Bean
   CommandLineRunner startListener(NotificationListener handler) {
       return (args) -> {
           Runnable listener = handler.createListener();
           Thread t = new Thread(listener, "listener");
           t.start();
       };
   }
}

У поточному контексті для розв’язання нашої задачі підходив навіть такий обмежений функціонал, і успіх вже видно на обрії.

Resiliency

Імплементація працювала чудово, але з часом ми почали помічати фантомів. Проявлялось це таким чином: у випадковий момент нотифікації переставали приходити на інстанси, причому така поведінка була не для всіх інстансів в кластері одночасно. Звичайно, пошуки в логах, збірки метрик, спроби відтворити проблему штучним чином (підходи на кшталт "а давайте зімітуємо навантаження 100500 RPS, поставимо на вихідні на тест сервер«і і тому подібне) не дали ніяких результатів.

Як буває, відтворити це вийшло випадково на локальному енвайроменті розробника, і користуючись моментом, вийшло знайти причину такої поведінки. Виявляється, що запит LISTEN... в певні періоди часу на деяких інстансах міг бути idle досить довго, щоб відключати з’єднання бази даних на стороні СУБД без жодних нотифікацій на стороні інтстанса. А інстанс, своєю чергою, продовжував «слухати» по суті закритий канал.

До речі, зі списку «слухачів» (див. pg_stat_activity) він також не видалявся.

Рішенням стало впровадження watchdog, який періодично (наприклад, раз на 10 секунд) посилав SELECT 1 запити на тому ж з’єднанні, де виконувався запит LISTEN, і в разі помилки — реініціалізував отримувача.

Redis

Реалізація вбудованого механізму publish/subscribe також існує для Redis (redis.io/...​/develop/interact/pubsub). В цьому випадку вона не була застосована, але при інших обставинах могла б стати альтернативою реалізації publish/subscribe в PostgreSQL.

Аналогічно з попереднім прикладом, існує набір команд для підписки і паблішингу повідомлень:

  • PUBLISH — команда для публікації повідомлення, приймає як аргумент назву каналу та текст повідомлення, наприклад PUBLISH my.channel «HELLO».
  • SUBSCRIBE — команда, що дозволяє підписатись на канал, як аргумент приймає назву каналу, наприклад, SUBSCRIBE my.channel. Додатково ця команда має 2 модифікації:
    • PSUBSCRIBE — дозволяє підписуватись на декілька каналів з використанням патернів, наприклад PSUBSCRIBE user.* (підписка на всі топіки, назва яких починається з user.*).
    • SSUBSCRIBE — дозволяє підписуватися на канали в випадку, якщо Redis працює в режимі кластера.
  • UNSUBSCRIBE — команда дозволяє відписатись від каналу, також має модифікації PUNSUBSCRIBE і SUNSUBSCRIBE.
  • CHANNELS — дозволяє переглянути активні канали/топіки, виглядає дана команда як: PUBSUB CHANNELS, і має опціональний аргумент pattern, що дозволяє фільтрувати список результатів. В режимі кластера дана команда повертає список всіх каналів у даному кластері.

Імплементація для Java/SpringBoot в цьому випадку буде виглядати простіше, оскільки уже включена в Spring Data Redis API.

Отримувач:

public class RedisListener implements MessageListener {

   @Override
   public void onMessage(Message message, byte[] pattern) {
       var mesageBody = new String(message.getBody());
       // application logic here
   }
}

Відправник:

@Component
public class RedisNotifier {


   private final RedisTemplate<String, Object> redisTemplate;
   @Value("org.nda.channel")
   private String channel;


   public void sendNotification(String message) {
       redisTemplate.convertAndSend(channel, message);
   }
}

Конфігурація:

@Configuration
public class RedisConfiguration {

   @Value("org.nda.channel")
   private String channel;

   @Bean
   MessageListenerAdapter messageListener() {
       return new MessageListenerAdapter(new RedisListener());
   }

   @Bean
   RedisMessageListenerContainer redisContainer() {
       RedisMessageListenerContainer container
               = new RedisMessageListenerContainer();
       container.setConnectionFactory(connectionFactory());
       container.addMessageListener(messageListener(), topic());
       return container;
   }

   @Bean
   Topic topic() {
       return new ChannelTopic(channel); // simplified
   }

   @Bean
   RedisConnectionFactory connectionFactory() {
       return new JedisConnectionFactory(); // simplified example
   }
}

Висновки

Крім розглянутих вище продуктів функціональність queue/pubsub у різних варіаціях реалізована в:

*список не є вичерпним і в ньому перелічені радше широко розповсюджені продукти.

Як правило, як і в розглянутих випадках, наведені реалізації містять деякі специфічні обмеження. І загалом можна сказати, уступають класичним брокерам повідомлень за багатьма критеріями, тому необхідно додатково проводити ретельний аналіз і дослідження щодо доцільності використання альтернативного підходу, наведеного в статті.

Виходячи з власного досвіду, я рекомендував би скористатися наступною схемою прийняття рішень з урахуванням, що брокери, які розглядаються, відповідають конкретним функціональним і нефункціональним вимогам вашого проєкту/продукту (в першу чергу, зважаючи на ВАШІ задачі та контекст).

👍ПодобаєтьсяСподобалось10
До обраногоВ обраному4
LinkedIn
Дозволені теги: blockquote, a, pre, code, ul, ol, li, b, i, del.
Ctrl + Enter
Дозволені теги: blockquote, a, pre, code, ul, ol, li, b, i, del.
Ctrl + Enter

згадав часи коли pg_notify у нас перестав справлятись (ну бо бізнес клієнта ріс слава богу) і як вирішення проблеми ми... перейшли на черги:)

classic closed socket problem

досить довго, щоб відключати з’єднання бази даних на стороні СУБД без жодних нотифікацій

в звичайному легкому брокері це вирішується за рахунок вбудованого механізму qos (нп qos=1 qos=2), в тому числі і коли відвалюється коннект (якщо я правильно зрозумів що то описано з бд)

Підписатись на коментарі