Не брокерами єдиними. Нетривіальні методи обміну повідомленнями в розподілених системах
Привіт, я Артем Дорохін, solution architect at Luxoft. Вирішив написати декілька статей, в яких розгляну деякі нестандартні випадки та рішення на проєктах, в яких я брав участь.
Контекст
Багато з тих, хто займається дизайном систем, знайомі з ситуацією, коли для якогось конкретного випадку/задачі/фічі дуже добре лягає використання черги (queue або topic). Але її додавання в архітектуру, впровадження і підтримка пов’язані зі значними труднощами, що можуть перекрити всі переваги. Або неможливі з різного роду причин (організаційні, бізнес/фінансові, технологічні й так далі).
Для початку хотілось би уточнити, що в цьому випадку мова НЕ йде про критичні задачі або задачі, де передбачається велике навантаження на цей елемент системи (тобто якщо задача критична у рамках продукту або можливе велике навантаження, — впроваджувати чергу як окремий елемент системи все-таки необхідно).
Нижче мова піде про альтернативні інструменти, які можна використати для вирішення подібного роду технічних задач, і як вони використовувались в одному з продуктів, на якому я працював.
Case
Є сервіс, написаний на Java/SpringBoot, база даних PostgreSQL, сервіс розгортається в Kubernetes-кластері з деякою кількістю реплік (для даної задачі кількість не є вирішальною, але для розуміння масштабу це
👇 А ви вже чули, що 21 червня DOU Mobile Day?
Запити користувачів циклічно (round robin) розподіляються по репліках, тобто запити від одного користувача в рамках однієї взаємодії з системою можуть випадково потрапити на будь-який інстанс сервіса.
Беручи до уваги вищезгадану структуру, для покращення взаємодії з користувачами перед нами постала задача розробки механізму для швидкої (realtime) нотифікації між репліками.
В цьому випадку я хотів би уточнити, що мова іде саме про нотифікації, тобто легкі (lightweight) повідомлення (< 200 символів), що ініціюються як реакція на певні дії користувачів.
Звичайно, першою ідеєю було впровадити який-небудь topic(s), на який підписаний кожен інстанс сервіса в кластері та слухає відповідні події з подальшою реакцією. Але як забезпечити роботу таких нотифікацій між потенційно непостійною кількістю інстансів, які можуть динамічно додаватись і/або видалятись? І очевидною відповіддю на це питання було розуміння, що цей механізм має бути окремим елементом системи, про якого «знає» кожна репліка сервіса.
Класичне рішення — використати один з існуючих або додати message broker мало типові недоліки додавання нового елемента в систему, пов’язане з подальшими розгортанням, інтеграцією, підтримкою (на рівні коду), security, observability, maintenance (як окремого елемента системи) і тому подібними речами, які роблять складні системи складними. Це не дуже подобалося, а також проти такого класичного підходу було те, що сам по собі цей механізм не був критичним для бізнес-цілей системи.
PostgreSQL
Після деяких пошуків потенційне рішення було знайдено. Виявилось, що з обмеженнями (які ми обговоримо нижче) механізм publish/subscribe реалізований в PostgreSQL (www.postgresql.org/...s/current/sql-notify.html www.postgresql.org/...s/current/sql-listen.html). Працює дана реалізація наступним чином:
- Отримувач для отримання нотифікацій виконує «SQL» команду LISTEN <channel-name>; і як результат її виконання буде отримувати всі повідомлення, що будуть надіслані на даний канал. Важливо, що після повернення результату (повідомлення) з’єднання з базою даних не закривається і продовжує блокуватись аж до моменту його явного закриття (або розриву у випадку помилки мережі й так далі).
- Для відправки повідомлення надсилач має виконати «SQL» команду NOTIFY <channel-name> <MESSAGE>; після чого команда вважається виконаною (ні, не буде підтверджень доставки, ретраїв і тому подібних плюшок). Розмір повідомлення обмежений 8000 байт.
- Після надсилання всі отримувачі, які в даний момент виконують команду LISTEN з указаною назвою каналу, отримають повідомлення, що було відправлене командою NOTIFY.
- Є обмежені можливості зрозуміти, що відбувається з відправниками/отримувачами за допомогою внутрішньої таблиці/в’ю pg_stat_activity, яка відображає поточний стан отримувачів, тобто які клієнти активно слухають які канали. І відправників, що є недобрим знаком, оскільки команда NOTIFY має виконуватись швидко (< 1ms). Тобто те, що повідомлення довго надсилається, є індикатором того, що щось пішло не за планом.
- Також є обмежені можливості подивитись, які канали зараз використовуються за допомогою внутрішньої таблиці/в’ю pg_listening_channels.
- Можна подивитись міру заповнення «черги» за допомогою команди SELECT pg_notification_queue_usage(); що поверне число від 0 до 1 зі ступінню заповненості місця в черзі (для всіх каналів), де 0 — це пуста черга, а 1 — повна. Розмір черги можна налаштовувати за допомогою конфігураційної змінної max_notify_queue_pages (postgresql.org/...UC-MAX-NOTIFY-QUEUE-PAGES).
В підсумку можна сказати, що все максимально просто і без жодних надмірностей, для самураїв. Код на Java/Spring також виглядає доволі просто і лаконічно.
Надсилач:
@Component public class Notifier { private final JdbcTemplate jdbcTemplate; public Notifier(JdbcTemplate jdbcTemplate) { this.jdbcTemplate = jdbcTemplate; } @NotNull public void notify(String channel, String message) { jdbcTemplate.execute(createQuery(channel, message)); } static String createQuery(String channel, String message) { return "NOTIFY %s '%s';".formatted(channel, message); } }
Отримувач дещо складніше:
// component that handles notification @Component public class NotificationListener implements Consumer<PGNotification> { // other code private DataSource datasource; private PgConnection connection; @Value("${org.nda.flag:true}") private Boolean enabled; @Value("${org.nda.channel}") private String channel; public NotificationListener(DataSource datasource) { this.datasource = datasource; } // Handle notifications according to application logic @Override public void accept(PGNotification notification) { String message = notification.getParameter(); // other code } /* run query */ @PostConstruct public void init() { if (enabled) { connection = getConnection(this.datasource); connection.createStatement().execute("LISTEN " + channel); } } // gracefully close connection in the end @PreDestroy public void destroy() { connection.close(); } // get the original postgres connection static PgConnection getConnection(DataSource ds) throws Exception { return ds.getConnection().unwrap(PgConnection.class); } /* runnable instance that will in endless loop listen notifications */ public Runnable createListener() { return () -> { while(!Thread.currentThread().isInterrupted()) { PGNotification[] notifications = connection.getNotifications(); Arrays.stream(notifications).forEach(this::accept); } }; } } // Configuration class @Configuration public class NotificationConfiguration { @Bean CommandLineRunner startListener(NotificationListener handler) { return (args) -> { Runnable listener = handler.createListener(); Thread t = new Thread(listener, "listener"); t.start(); }; } }
У поточному контексті для розв’язання нашої задачі підходив навіть такий обмежений функціонал, і успіх вже видно на обрії.
Resiliency
Імплементація працювала чудово, але з часом ми почали помічати фантомів. Проявлялось це таким чином: у випадковий момент нотифікації переставали приходити на інстанси, причому така поведінка була не для всіх інстансів в кластері одночасно. Звичайно, пошуки в логах, збірки метрик, спроби відтворити проблему штучним чином (підходи на кшталт "а давайте зімітуємо навантаження 100500 RPS, поставимо на вихідні на тест сервер«і і тому подібне) не дали ніяких результатів.
Як буває, відтворити це вийшло випадково на локальному енвайроменті розробника, і користуючись моментом, вийшло знайти причину такої поведінки. Виявляється, що запит LISTEN... в певні періоди часу на деяких інстансах міг бути idle досить довго, щоб відключати з’єднання бази даних на стороні СУБД без жодних нотифікацій на стороні інтстанса. А інстанс, своєю чергою, продовжував «слухати» по суті закритий канал.
До речі, зі списку «слухачів» (див. pg_stat_activity) він також не видалявся.
Рішенням стало впровадження watchdog, який періодично (наприклад, раз на 10 секунд) посилав SELECT 1 запити на тому ж з’єднанні, де виконувався запит LISTEN, і в разі помилки — реініціалізував отримувача.
Redis
Реалізація вбудованого механізму publish/subscribe також існує для Redis (redis.io/.../develop/interact/pubsub). В цьому випадку вона не була застосована, але при інших обставинах могла б стати альтернативою реалізації publish/subscribe в PostgreSQL.
Аналогічно з попереднім прикладом, існує набір команд для підписки і паблішингу повідомлень:
- PUBLISH — команда для публікації повідомлення, приймає як аргумент назву каналу та текст повідомлення, наприклад PUBLISH my.channel «HELLO».
- SUBSCRIBE — команда, що дозволяє підписатись на канал, як аргумент приймає назву каналу, наприклад, SUBSCRIBE my.channel. Додатково ця команда має 2 модифікації:
- PSUBSCRIBE — дозволяє підписуватись на декілька каналів з використанням патернів, наприклад PSUBSCRIBE user.* (підписка на всі топіки, назва яких починається з user.*).
- SSUBSCRIBE — дозволяє підписуватися на канали в випадку, якщо Redis працює в режимі кластера.
- UNSUBSCRIBE — команда дозволяє відписатись від каналу, також має модифікації PUNSUBSCRIBE і SUNSUBSCRIBE.
- CHANNELS — дозволяє переглянути активні канали/топіки, виглядає дана команда як: PUBSUB CHANNELS, і має опціональний аргумент pattern, що дозволяє фільтрувати список результатів. В режимі кластера дана команда повертає список всіх каналів у даному кластері.
Імплементація для Java/SpringBoot в цьому випадку буде виглядати простіше, оскільки уже включена в Spring Data Redis API.
Отримувач:
public class RedisListener implements MessageListener { @Override public void onMessage(Message message, byte[] pattern) { var mesageBody = new String(message.getBody()); // application logic here } }
Відправник:
@Component public class RedisNotifier { private final RedisTemplate<String, Object> redisTemplate; @Value("org.nda.channel") private String channel; public void sendNotification(String message) { redisTemplate.convertAndSend(channel, message); } }
Конфігурація:
@Configuration public class RedisConfiguration { @Value("org.nda.channel") private String channel; @Bean MessageListenerAdapter messageListener() { return new MessageListenerAdapter(new RedisListener()); } @Bean RedisMessageListenerContainer redisContainer() { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory()); container.addMessageListener(messageListener(), topic()); return container; } @Bean Topic topic() { return new ChannelTopic(channel); // simplified } @Bean RedisConnectionFactory connectionFactory() { return new JedisConnectionFactory(); // simplified example } }
Висновки
Крім розглянутих вище продуктів функціональність queue/pubsub у різних варіаціях реалізована в:
- MongoDB (event stream www.mongodb.com/...ocs/manual/changeStreams).
- Oracle (queue + pubsub www.oracle.com/...atabase/advanced-queuing).
- Microsoft SQL Server (queue learn.microsoft.com/...sql-server-service-broker).
*список не є вичерпним і в ньому перелічені радше широко розповсюджені продукти.
Як правило, як і в розглянутих випадках, наведені реалізації містять деякі специфічні обмеження. І загалом можна сказати, уступають класичним брокерам повідомлень за багатьма критеріями, тому необхідно додатково проводити ретельний аналіз і дослідження щодо доцільності використання альтернативного підходу, наведеного в статті.
Виходячи з власного досвіду, я рекомендував би скористатися наступною схемою прийняття рішень з урахуванням, що брокери, які розглядаються, відповідають конкретним функціональним і нефункціональним вимогам вашого проєкту/продукту (в першу чергу, зважаючи на ВАШІ задачі та контекст).
3 коментарі
Додати коментар Підписатись на коментаріВідписатись від коментарів