Як масштабувати мікросервіси в Azure за допомогою принципу CQRS
Підписуйтеся на Telegram-канал «DOU #tech», щоб не пропустити нові технічні статті
Я Сергій Селецький, Senior Solution Architect в компанії Intellias та експерт в Centers of Excellence — підрозділі, який займається підвищенням кваліфікації інженерів і розробки загалом. Автор Intellias CQRS фреймворку, який використовується на декількох проєктах в Intellias. Маю понад 12 років досвіду роботи з .NET та Azure Cloud.
У цій статті я розповім про принцип імперативного програмування CQRS, його походження, складові та практичне використання для масштабування мікросервісів в Azure. Розглянемо реальні приклади структури проєкту та поговоримо про тестування систем з CQRS-архітектурою.
CQRS: огляд та теорія
Перш ніж говорити про CQRS, згадаймо, звідки він походить, а саме CQS. Це принцип імперативного програмування та розподілення команд і запитів. Зазвичай, він використовується через бібліотеку MediatR в .NET і все працює в рамках одного застосунку. З іншого боку, CQRS — це той самий CQS патерн, але використовується він на рівні архітектури всієї системи.
У CQRS система розділяється на дві частини: write та read. Зазвичай на write-частині розташована база даних, яка забезпечує цілісність даних, а на read-частині є інша база даних, яка витримує високе навантаження на читання і добре масштабується. Дві частини об’єднуються через Service Bus, Event Hub або Kafka. Поверх них розташований фасад API Gateway, який агрегує всі API, тому для кінцевого користувача система виглядає як звичайний мікросервіс чи моноліт.
Write-частина працює так: у REST Controller створюється команда, до якої додається необхідна інформація — контекст ідентифікатора користувача тощо. Після цього команда запускає Command Handler, де ми опрацьовуємо команду і створюємо подію. Остання зі свого боку зберігається в Event Store. У той самий час на read-частині у нас є Event Handler, який слухає чергу подій і застосовує подію на саму Query модель читання даних.
Перевага такої моделі полягає в тому, що ми можемо нескінченно масштабувати read-частину, додаючи все більше і більше інстансів.
Крім того, якщо ви знайомі з САР-теоремою, то знаєте, що ми можемо використовувати дві парадигми кожної частини системи на свою користь. Наприклад, дані на write-частині є цілісними та доступними, тож ми маємо змогу вибрати необхідні бази даних, з ASID транзакційністю тощо. У той самий час можемо забезпечити хороший partition tolerance на read-частині. Тобто для кожної потреби системи можемо обрати кращі характеристики.
Використання CQRS-системи на практиці
Познайомившись з теорією, переходимо до практики на платформі Azure.
Це спрощена діаграма, адже в реальних проєктах використовуються десятки додаткових сервісів: Azure FrontDoor, Traffic Manager тощо. Але такої діаграми цього достатньо, щоб продемонструвати роботу системи.
Все починається із запиту від застосунку Angular, який спрямовує запит до API Management та конвертує його у команду. У цей час відбувається валідація. Наприклад, якщо ви вводите 100 000 знаків у поле, яке розраховане на 250, такий запит не буде відправлено на конвертацію. Потім конвертована команда відправляється в командну шину. Ми надаємо клієнтові унікальний код команди, за яким її можна відслідковувати. Далі команда активується і виконується в Command Handler, події потрапляють на Event Handler. Після цього клієнтові приходить сповіщення, що операцію завершено.
Розглянемо це більш детально на одному сервісі.
Зауважте, що архітектура сервісу дещо змінена. Тут замість API Management використовується app service. Він дає нам додаткові переваги, а саме зменшує затримки та дозволяє використовувати інші дуплексні протоколи, крім REST та GraphQL.
Коли на app сервіс приходить запит, в роботу вмикається пустий Controller. Все, що він робить, — валідує атрибути на ту модель, яка надходить, та додає додаткову інформацію з http контексту.
public async Task<IActionResult> Post(MyActionModel model) { var command = model.Map<TCommand>(); // AutoMapper. await EnrichCommandAsync(command, httpContext); // Add JWT and telemetry data from HTTP. var validationResult = command.Validate(); // Contract validation, simple rules, No BL. if (validationResult.Success) { validationResult = await validator.VerifyAsync(command); // Constraints validation. } if (!validationResult.Success) { return NotifySubscription(command, (FailedResult)validationResult); // SignalR. } await commandQueue.PublishAsync(command); // Publish to storage queue. return Ok(command.CorrelationId); // Return ID for tracking result. }
Це може бути інформація з JWT-токена, дані про користувачів, їхня IP-адреса, локація тощо. Після цього команда відправляється на командну шину. Тут ми можемо зробити системну валідацію, адже в самих Command Handler-ах немає міжсервісного контексту. Тому, якщо ми хочемо, перевірити, наприклад, унікальність імені в системі або ж подивитись, чи провалідовані атрибути, це треба робити тут.
Далі переходимо до Command Handler. Тут є звичайний Azure Function Trigger, в якому ми розпаковуємо команду. За допомогою GetCommand методу ми десерилізовуємо команду та отримуємо вже сам об’єкт команди, який далі обробляється в Command Handler-і. Інакше кажучи, ми відтворюємо Aggregate Root бізнес-логіки, передаючи type і state, а далі викликаємо якусь специфічну бізнес-функцію нашого домену, яка й опрацьовує команду. Для конкретної команди ми відправляємо сповіщення через SignalR про успішне виконання. Тобто front-end або мобільний застосунок отримує сповіщення, що команда виконана і він може виконувати наступну дію. Далі створюється інтеграційна подія, яка відправляється на шину подій.
[FunctionName(nameof(CommandHandlerFunctions))] public async Task HandleAsync([QueueTrigger("commands-queue")] Message message) { var cmd = message.GetCommand(); var ar = await context.AggregateAsync<TRoot, TState>(cmd.RootId, cmd.ExVersion); if(ar == null) { return NotifySubscription(cmd, FailedResult.NotFound); // SignalR. } var result = await ar.SomeDomainSpecificMethod(cmd); if (!result.Success) { return result.ForCommand<TCommand>().NotifySubscription(); // SignalR. } var integrationEvent = context.CreateIntegrationEvent<TEvent>(); await eventHub.PublishAsync(integrationEvent); }
Як працює Aggregate Root?
Aggregate Root
class TestRoot : AggregateRoot<TestState> public IExecutionResult Update(TestCommand cmd) { if (cmd.TestData.Length < 10) { return ValidationFailed(CoreErrorCodes.Failed) } cmd.TestData = cmd.TestData; // Compile fail. PublishEvent(new TestEvent { TestData = cmd.TestData, }); return Success(); }
Aggregate State
public class TestState : AggregateState, IEventApplier<TestEvent>, { public TestState() { Handles<TestEvent>(Apply); } public string TestData { get; private set; } public void Apply(TestEvent @event) { TestData = @event.TestData; }
Всередині агрегату логіки дуже прості. Це об’єкт, який складається з двох частин. Одна частина — це Aggregate State, який має приватні поля та сектори і все, що він може зробити, це застосувати події, які прийшли на ці поля. Цей об’єкт використовується як generic-тип для Aggregate-об’єкта. Ми маємо доступ прочитати ці поля через get, але не можемо їх змінити. Це значно зменшує кількість помилок, адже розробник пише чисту логіку з чистою перевіркою бізнес-правил і ніяк не може змінити state.
Handle event
[FunctionName(nameof(CommandHandlerFunctions))] public async Task HandleAsync([EventHubTrigger(“events-topic")] EventData data) { var myEvent = data.GetEvent(); var writer = await store.GetWriter<TQuery>(myEvent.RootId, context); var model = await writer.ProjectAsync(myEvent, view => { view.SomeUIField = myEvent.NewValue; view.UpdatedBy = myEvent.Actor.UserName; // and many more... }); return NotifySubscription(new CompletedResult(model)); // SignalR. }
Перейдемо в Event Handler. Тут все набагато простіше. Все, що ми можемо тут робити, це записувати дані. Наприклад, за Aggregate ID або іншою ознакою, яку ми визначаємо для об’єкта query. В Еvent Handler-і в нас є сотні таких методів проєкції даних, кожен з яких проєктує певний вид даних, який використовується на вебклієнті, мобільному застосунку або в сторонній системі. Далі дані з події проєктуються на конкретні дані виду. Після виконання проєкцій в базу даних читання, ми знову ж таки відправляємо сповіщення по SignalR для клієнта.
І далі переходимо на вичитку даних з Cosmos DB. Оскільки у нас read-модель уже збережена в документі, то все, що нам потрібно зробити, це отримати read-модель.
Read query
public async Task<IActionResult> Get(string id) { var view = queryProvider.GetById<MyQueryModel>(id); return Ok(view); } public MyQueryModel : IMutableQueryModel { public int Version { get; set; } // used for command on related aggregate public string RootId { get; set; } public List<SomeFragment> Items { get; set; } public SomeClass Nested { get; set; } }
Цей код є на app сервісі, як GET метод отримання конкретної Query моделі. Модель створюється так, щоб її повністю вистачало для роботи клієнтові. Наприклад, якщо клієнт відображає сторінку з якимось профілем користувача і далі має ще список чогось, то йому не потрібно на цей список робити окремі запити.
Тестування систем з CQRS-архітектурою
Ми розібрали основний процес. А тепер — низка бонусних рекомендацій, як ефективно використовувати цей шаблон в Azure. Почнемо з тестування.
How to test it?
// Prepare query storage var queryStore = new InProcessQueryStore<TestQueryModel>(tables); // Attach event handlers to query store var eventHandlers = new DemoEventHandlers(queryStore); // Create event bus and subscribe handlers to it var eventBus = new InProcessEventBus(); eventBus.AddAllHandlers(eventHandlers); // Register event store to populate events into event bus var eventStore = new InProcessEventStore(); // Attach event store to command handlers var commandHandlers = new DemoCommandHandlers(eventStore, eventBus); // Create command bus and subscribe command handlers commandBus = new InProcessCommandBus(); commandBus.AddAllHandlers(commandHandlers);
Найефективніший підхід для тестування такого ряду систем з CQRS-архітектурою — створення In Process Test Framework за допомогою інтеграційних, або юніт-тестів. Це допоможе не розгортати велику кількість компонентів, а протестувати лише цю виділену пам’ять і за лічені секунди пройти всі необхідні тести бізнес-логіки. Усе, що роблять інтеграційні тести в цьому випадку — піднімають In Process, In Memory всі необхідні компоненти для CQRS, а нам потрібно лише створити команду.
How to test it?
// Send command to create aggregate var createCommand = new TestCreateCommand { TestData = "Test data", AggregateRootId = Unified.NewCode() }; createCommand.Wrap(); var createResult = commandBus.PublishAsync(createCommand).Result; // Get read query var queryResult = queryReader.GetOneAsync(aggregateId).Result; Assert.Equal(createCommand.TestData, queryResult.TestData);
У цьому разі Wrap-метод робить те саме, що і метод, який пов’язує команду з додатковими даними з JWT token. По суті, він бере контекст виконання, зокрема інформацію про користувача, який авторизований, його права тощо. Після цього ми команду відправляємо просто в шину (ESB). В такій ситуації в InMemory і вже з Query моделі або через підписку можемо отримати подію про виконання або просто прочитати ці дані.
А тепер кілька слів про інші види тестів. InMemory Framework — це добре, але в Cloud компоненти можуть поводитися по-іншому. Тому ми завжди частину тестів додаємо з можливістю запуску на реальних компонентах. Такі тести набагато повільніші, але добре працюють локально. На самому CICD ми запускаємо обидва види тестів: як інтеграційні юніт-тести, так і тести з реальними компонентами. Останні вже в Azure не на емуляторах, розгортають справжні сервіси через ARM template або Terraform і запускають на цих ресурсах тести. Після цього ці ресурси видаляються, оскільки на кожен тест нам потрібне ізольоване одноразове середовище, щоб кожен раз тести проходили успішно. Найкращим інструментом для подібного моніторингу є Application Insights, де ми проводимо кореляцію
Основна рекомендація щодо використання Application Insights — впевнитися, що у вас короткий час зберігання, зокрема на стейджі. Архітектура CQRS з Azure Funсtions може згенерувати сотні гігабайтів даних за місяць, і врешті Application Insights може стати набагато дорожчим, ніж сама інфраструктура Azure Funсtions. За замовчуванням Application Insights відстежує всі дані 90 днів, я ж раджу скоротити цей період або встановити обмеження на об’єм у 5 чи 10 Гб.
Приклад структури проєкту
Вище — приклад структури одного з наших проєктів. Ми використовуємо Multi-repo-підхід, коли в кожному репозиторії в нас є один повністю ізольований мікросервіс з усіма необхідними компонентами в одному репозиторії. Це:
- інфраструктурний код (arm template або terraform);
- Azure Function проєкти для Common Handler, Event Handler;
- контракти.
Контракти публікуються як NuGet пакет і використовуються іншими частинами системи, сервісами та клієнтами. Там є всі необхідні компоненти для зв’язку: події, команди та запити, — усе, що виходить за рамки сервісу.
Azure Function Host Configuration
public void Configure(IFunctionsHostBuilder builder) { var configuration = BuildConfiguration(); var ingrowthConfig = new IngrowthConfig(); configuration.Bind(ingrowthConfig); services.AddSingleton<TelemetryClient>(); services.AddTransient<IAggregateStore, AggregateStore>(); services.AddSingleton<IEventStore>(_ => new StateEventTableStore(CreateTableStorageOptions()); services.AddSingleton<ITransactionStore>(_ => new TransactionTableStore()); services.AddSingleton<IIntegrationEventStore>(_ => new IntegrationEventTableStore()); services.AddSingleton<IReportBus>(_ => new AzureServiceTopicReportBus()); services.AddSingleton<IEventBus>(_ => new AzureServiceTopicEventBus()); // Registering cloud storage account services.AddSingleton(CloudStorageAccount.Parse(ingrowthConfig.StorageAccountConnectionString));
Для спрощення роботи з Azure Functions ми зазвичай створюємо шаблон стартапу, який використовується всіма мікросервісами. Процес формування бізнес-об’єктів в Azure Functions виглядає дуже просто: є звичайна точка входу, є Functions стартап, який запускає цю конфігурацію, та тригери. Їх може бути багато. Тригери отримують події, десереалізують дані, а далі запускається той самий MediatR, який ці події віддає вже конкретним обробникам і агрегатам.
У самому Azure кожен мікросервіс ізольований всередині ресурсної групи, і ми маємо всі компоненти, які необхідні для сервісу. З одного репозиторію все розгортається в одну ресурсну групу. Це значно спрощує менеджмент такої інфраструктури. Тут у нас є Command Handler, Event Handler і Process Manager, який виконує роль синхронізації Saga, і сховище. Для рішень з більш високим навантаженням ми використовуємо Cosmos DB.
Щодо вартості: на цьому внутрішньому проєкті вартість середовища — близько 50$. Додатково ми запускаємо тут усі e2e тести. У вартість входять різні інтеграційні сервіси і сам app сервіс, оскільки він не безсерверний. Azure Functions, своєю чергою, коштують менше як 3$, при тому, що обробляють понад 15 млн запитів кожного місяця. Це чи не найдешевший обчислювальний ресурс, який зараз є в Azure, і безкоштовний для кількості запитів до 1 млн, або до 400 TB/s.
Також я підготував проєкт з відкритим вихідним кодом на GitHub, де можна переглянути загальні абстракції CQRS-фреймворку. Усе написано на .NET Standard, тому може бути використано не тільки в Azure Functions, а на будь-якому .NET-проєкті.
Що робити із cold start?
Azure Functions Cold start
/// <summary> /// WarmerTrigger - keep func always loaded. /// </summary> /// <param name="timer">Timer trigger info.</param> /// <returns>A task representing the asynchronous operation.</returns> [FunctionName("WarmerTrigger")] [Disable(“Option_WarmerDisable")] public async Task WarmAsync([TimerTrigger("0 */4 * * * *")]TimerInfo timer) { if (timer.IsPastDue) { log.LogInformation("Warmer is running late!"); } await Task.Delay(0); }
І ще порада для тих, хто хоче використовувати Consumption Plan (план споживання, який автоматично розподіляє обчислювальну потужність, коли код працює, — ред.). Для нього характерна така проблема, як холодний старт. Її можна по-різному лагодити, але повністю вирішити не вдасться. Найпростіший варіант — додавати таймер-тригер. Він запускається кожні 4 хвилини й нічого не виконує. Це просто сигнал для хосту, що тут є повторюваний тригер, який забезпечує постійне утримання пакету Azure Functions на хості. Це значно зменшує час очікування на запуску. Тоді замість
Event Sourcing — дещо складніше за CQRS
Те, що CQRS дуже складний, — це міф. Насправді CQRS дуже простий у порівнянні з Event Sourcing (шаблон проєктування, який представляє стан об’єкта у вигляді множини подій, — ред.). Якщо ви не впевнені щодо використання CQRS, рекомендую почати його використовувати без сорсингу подій, на стаціонарних моделях, на write-частині. З Event Sourcing є кілька складнощів:
- Проблема підтримки версійності подій. Події просто так видалити не можна: всі старі версії потрібно зберігати, щоб мати можливість зробити повторення системи.
- Необхідність збереження всієї бізнес-логіки з версіями подій. Навіть якщо ця бізнес-логіка вже застаріла, її треба підтримувати. Інакше результат виконання буде відрізнятися.
Для вирішення цих проблем можна застосувати кілька підходів, на жаль, усі вони неідеальні. Це мапінг старих версій подій на нові та їхній запуск через ту саму логіку; запуск різних пакетів логіки для різних подій; мутації подій, коли вони переміщуються (найменш популярний підхід, допустимий у звичайних SQL-базах, але не у Event Sourcing, де неприпустима міграція даних).
Тому, якщо у вас буде бажання спробувати CQRS, рекомендую почати з CQS-підходу без сорсингу подій.
Що почитати
І наостанок — список книг, які я рекомендую.
Ерік Еванс є автором предметно-орієнтованого проєктування, тому варто прочитати його Domain Driven Design. Метод штурму подій пішов так само від DDD, тож це обов’язково для тих, хто буде займатися мікросервісами і CQRS.
Building Microservices: Designing Fine-Grained Systems Сема Ньюмана про побудову мікросервісів.
Exploring CQR and Event Journey від Microsoft — ще одна хороша стартова книга з CQRS.
З ресурсів найбільш цінним мені здається Greg Yang. Він містить публікації про CQRS і чудовий продукт — Event Store. Це база даних, як саме створена для збереження подій в Event Source.
Дякую, що дочитали до кінця, радий відповісти на запитання.
43 коментарі
Додати коментар Підписатись на коментаріВідписатись від коментарів