Як масштабувати мікросервіси в Azure за допомогою принципу CQRS

Підписуйтеся на Telegram-канал «DOU #tech», щоб не пропустити нові технічні статті

Я Сергій Селецький, Senior Solution Architect в компанії Intellias та експерт в Centers of Excellence — підрозділі, який займається підвищенням кваліфікації інженерів і розробки загалом. Автор Intellias CQRS фреймворку, який використовується на декількох проєктах в Intellias. Маю понад 12 років досвіду роботи з .NET та Azure Cloud.

У цій статті я розповім про принцип імперативного програмування CQRS, його походження, складові та практичне використання для масштабування мікросервісів в Azure. Розглянемо реальні приклади структури проєкту та поговоримо про тестування систем з CQRS-архітектурою.

CQRS: огляд та теорія

Перш ніж говорити про CQRS, згадаймо, звідки він походить, а саме CQS. Це принцип імперативного програмування та розподілення команд і запитів. Зазвичай, він використовується через бібліотеку MediatR в .NET і все працює в рамках одного застосунку. З іншого боку, CQRS — це той самий CQS патерн, але використовується він на рівні архітектури всієї системи.

У CQRS система розділяється на дві частини: write та read. Зазвичай на write-частині розташована база даних, яка забезпечує цілісність даних, а на read-частині є інша база даних, яка витримує високе навантаження на читання і добре масштабується. Дві частини об’єднуються через Service Bus, Event Hub або Kafka. Поверх них розташований фасад API Gateway, який агрегує всі API, тому для кінцевого користувача система виглядає як звичайний мікросервіс чи моноліт.

Write-частина працює так: у REST Controller створюється команда, до якої додається необхідна інформація — контекст ідентифікатора користувача тощо. Після цього команда запускає Command Handler, де ми опрацьовуємо команду і створюємо подію. Остання зі свого боку зберігається в Event Store. У той самий час на read-частині у нас є Event Handler, який слухає чергу подій і застосовує подію на саму Query модель читання даних.

Перевага такої моделі полягає в тому, що ми можемо нескінченно масштабувати read-частину, додаючи все більше і більше інстансів.

Крім того, якщо ви знайомі з САР-теоремою, то знаєте, що ми можемо використовувати дві парадигми кожної частини системи на свою користь. Наприклад, дані на write-частині є цілісними та доступними, тож ми маємо змогу вибрати необхідні бази даних, з ASID транзакційністю тощо. У той самий час можемо забезпечити хороший partition tolerance на read-частині. Тобто для кожної потреби системи можемо обрати кращі характеристики.

Використання CQRS-системи на практиці

Познайомившись з теорією, переходимо до практики на платформі Azure.

Це спрощена діаграма, адже в реальних проєктах використовуються десятки додаткових сервісів: Azure FrontDoor, Traffic Manager тощо. Але такої діаграми цього достатньо, щоб продемонструвати роботу системи.

Все починається із запиту від застосунку Angular, який спрямовує запит до API Management та конвертує його у команду. У цей час відбувається валідація. Наприклад, якщо ви вводите 100 000 знаків у поле, яке розраховане на 250, такий запит не буде відправлено на конвертацію. Потім конвертована команда відправляється в командну шину. Ми надаємо клієнтові унікальний код команди, за яким її можна відслідковувати. Далі команда активується і виконується в Command Handler, події потрапляють на Event Handler. Після цього клієнтові приходить сповіщення, що операцію завершено.

Розглянемо це більш детально на одному сервісі.

Зауважте, що архітектура сервісу дещо змінена. Тут замість API Management використовується app service. Він дає нам додаткові переваги, а саме зменшує затримки та дозволяє використовувати інші дуплексні протоколи, крім REST та GraphQL.

Коли на app сервіс приходить запит, в роботу вмикається пустий Controller. Все, що він робить, — валідує атрибути на ту модель, яка надходить, та додає додаткову інформацію з http контексту.

public async Task<IActionResult> Post(MyActionModel model)
{
var command = model.Map<TCommand>(); // AutoMapper.
await EnrichCommandAsync(command, httpContext); // Add JWT and telemetry data from HTTP.
var validationResult = command.Validate(); // Contract validation, simple rules, No BL.
if (validationResult.Success)
{
validationResult = await validator.VerifyAsync(command); // Constraints validation.
}
if (!validationResult.Success)
{
return NotifySubscription(command, (FailedResult)validationResult); // SignalR.
}
await commandQueue.PublishAsync(command); // Publish to storage queue.
return Ok(command.CorrelationId); // Return ID for tracking result.
}

Це може бути інформація з JWT-токена, дані про користувачів, їхня IP-адреса, локація тощо. Після цього команда відправляється на командну шину. Тут ми можемо зробити системну валідацію, адже в самих Command Handler-ах немає міжсервісного контексту. Тому, якщо ми хочемо, перевірити, наприклад, унікальність імені в системі або ж подивитись, чи провалідовані атрибути, це треба робити тут.

Далі переходимо до Command Handler. Тут є звичайний Azure Function Trigger, в якому ми розпаковуємо команду. За допомогою GetCommand методу ми десерилізовуємо команду та отримуємо вже сам об’єкт команди, який далі обробляється в Command Handler-і. Інакше кажучи, ми відтворюємо Aggregate Root бізнес-логіки, передаючи type і state, а далі викликаємо якусь специфічну бізнес-функцію нашого домену, яка й опрацьовує команду. Для конкретної команди ми відправляємо сповіщення через SignalR про успішне виконання. Тобто front-end або мобільний застосунок отримує сповіщення, що команда виконана і він може виконувати наступну дію. Далі створюється інтеграційна подія, яка відправляється на шину подій.

[FunctionName(nameof(CommandHandlerFunctions))]
public async Task HandleAsync([QueueTrigger("commands-queue")] Message message)
{
var cmd = message.GetCommand();
var ar = await context.AggregateAsync<TRoot, TState>(cmd.RootId, cmd.ExVersion);
if(ar == null)
{
return NotifySubscription(cmd, FailedResult.NotFound); // SignalR.
}
var result = await ar.SomeDomainSpecificMethod(cmd);
if (!result.Success)
{
return result.ForCommand<TCommand>().NotifySubscription(); // SignalR.
}
var integrationEvent = context.CreateIntegrationEvent<TEvent>();
await eventHub.PublishAsync(integrationEvent);
}

Як працює Aggregate Root?

Aggregate Root

class TestRoot : AggregateRoot<TestState>
public IExecutionResult Update(TestCommand cmd)
{
if (cmd.TestData.Length < 10)
{
return ValidationFailed(CoreErrorCodes.Failed)
}
cmd.TestData = cmd.TestData; // Compile fail.
PublishEvent(new TestEvent
{
TestData = cmd.TestData,
});
return Success();
}

Aggregate State

public class TestState : AggregateState,
IEventApplier<TestEvent>,
{
public TestState()
{
Handles<TestEvent>(Apply);
}
public string TestData { get; private set; }
public void Apply(TestEvent @event)
{
TestData = @event.TestData;
}

Всередині агрегату логіки дуже прості. Це об’єкт, який складається з двох частин. Одна частина — це Aggregate State, який має приватні поля та сектори і все, що він може зробити, це застосувати події, які прийшли на ці поля. Цей об’єкт використовується як generic-тип для Aggregate-об’єкта. Ми маємо доступ прочитати ці поля через get, але не можемо їх змінити. Це значно зменшує кількість помилок, адже розробник пише чисту логіку з чистою перевіркою бізнес-правил і ніяк не може змінити state.

Handle event

[FunctionName(nameof(CommandHandlerFunctions))]
public async Task HandleAsync([EventHubTrigger(“events-topic")] EventData data)
{
var myEvent = data.GetEvent();
var writer = await store.GetWriter<TQuery>(myEvent.RootId, context);
var model = await writer.ProjectAsync(myEvent, view => {
view.SomeUIField = myEvent.NewValue;
view.UpdatedBy = myEvent.Actor.UserName;
// and many more...
});
return NotifySubscription(new CompletedResult(model)); // SignalR.
}

Перейдемо в Event Handler. Тут все набагато простіше. Все, що ми можемо тут робити, це записувати дані. Наприклад, за Aggregate ID або іншою ознакою, яку ми визначаємо для об’єкта query. В Еvent Handler-і в нас є сотні таких методів проєкції даних, кожен з яких проєктує певний вид даних, який використовується на вебклієнті, мобільному застосунку або в сторонній системі. Далі дані з події проєктуються на конкретні дані виду. Після виконання проєкцій в базу даних читання, ми знову ж таки відправляємо сповіщення по SignalR для клієнта.

І далі переходимо на вичитку даних з Cosmos DB. Оскільки у нас read-модель уже збережена в документі, то все, що нам потрібно зробити, це отримати read-модель.

Read query

public async Task<IActionResult> Get(string id)
{
var view = queryProvider.GetById<MyQueryModel>(id);
return Ok(view);
}

public MyQueryModel : IMutableQueryModel
{
public int Version { get; set; } // used for command on related aggregate
public string RootId { get; set; }
public List<SomeFragment> Items { get; set; }
public SomeClass Nested { get; set; }
}

Цей код є на app сервісі, як GET метод отримання конкретної Query моделі. Модель створюється так, щоб її повністю вистачало для роботи клієнтові. Наприклад, якщо клієнт відображає сторінку з якимось профілем користувача і далі має ще список чогось, то йому не потрібно на цей список робити окремі запити.

Тестування систем з CQRS-архітектурою

Ми розібрали основний процес. А тепер — низка бонусних рекомендацій, як ефективно використовувати цей шаблон в Azure. Почнемо з тестування.

How to test it?

// Prepare query storage
var queryStore = new InProcessQueryStore<TestQueryModel>(tables);
// Attach event handlers to query store
var eventHandlers = new DemoEventHandlers(queryStore);
// Create event bus and subscribe handlers to it
var eventBus = new InProcessEventBus();
eventBus.AddAllHandlers(eventHandlers);
// Register event store to populate events into event bus
var eventStore = new InProcessEventStore();
// Attach event store to command handlers
var commandHandlers = new DemoCommandHandlers(eventStore, eventBus);
// Create command bus and subscribe command handlers
commandBus = new InProcessCommandBus();
commandBus.AddAllHandlers(commandHandlers);

Найефективніший підхід для тестування такого ряду систем з CQRS-архітектурою — створення In Process Test Framework за допомогою інтеграційних, або юніт-тестів. Це допоможе не розгортати велику кількість компонентів, а протестувати лише цю виділену пам’ять і за лічені секунди пройти всі необхідні тести бізнес-логіки. Усе, що роблять інтеграційні тести в цьому випадку — піднімають In Process, In Memory всі необхідні компоненти для CQRS, а нам потрібно лише створити команду.

How to test it?

// Send command to create aggregate
var createCommand = new TestCreateCommand
{
TestData = "Test data",
AggregateRootId = Unified.NewCode()
};
createCommand.Wrap();
var createResult = commandBus.PublishAsync(createCommand).Result;
// Get read query
var queryResult = queryReader.GetOneAsync(aggregateId).Result;
Assert.Equal(createCommand.TestData, queryResult.TestData);

У цьому разі Wrap-метод робить те саме, що і метод, який пов’язує команду з додатковими даними з JWT token. По суті, він бере контекст виконання, зокрема інформацію про користувача, який авторизований, його права тощо. Після цього ми команду відправляємо просто в шину (ESB). В такій ситуації в InMemory і вже з Query моделі або через підписку можемо отримати подію про виконання або просто прочитати ці дані.

А тепер кілька слів про інші види тестів. InMemory Framework — це добре, але в Cloud компоненти можуть поводитися по-іншому. Тому ми завжди частину тестів додаємо з можливістю запуску на реальних компонентах. Такі тести набагато повільніші, але добре працюють локально. На самому CICD ми запускаємо обидва види тестів: як інтеграційні юніт-тести, так і тести з реальними компонентами. Останні вже в Azure не на емуляторах, розгортають справжні сервіси через ARM template або Terraform і запускають на цих ресурсах тести. Після цього ці ресурси видаляються, оскільки на кожен тест нам потрібне ізольоване одноразове середовище, щоб кожен раз тести проходили успішно. Найкращим інструментом для подібного моніторингу є Application Insights, де ми проводимо кореляцію ID-операції через усі сервіси — через Common Handler, Azure functions, API management, — та всі чeрги: зокрема Event Hubs, та активацію самих Event Handlers.

Основна рекомендація щодо використання Application Insights — впевнитися, що у вас короткий час зберігання, зокрема на стейджі. Архітектура CQRS з Azure Funсtions може згенерувати сотні гігабайтів даних за місяць, і врешті Application Insights може стати набагато дорожчим, ніж сама інфраструктура Azure Funсtions. За замовчуванням Application Insights відстежує всі дані 90 днів, я ж раджу скоротити цей період або встановити обмеження на об’єм у 5 чи 10 Гб.

Приклад структури проєкту

Вище — приклад структури одного з наших проєктів. Ми використовуємо Multi-repo-підхід, коли в кожному репозиторії в нас є один повністю ізольований мікросервіс з усіма необхідними компонентами в одному репозиторії. Це:

  • інфраструктурний код (arm template або terraform);
  • Azure Function проєкти для Common Handler, Event Handler;
  • контракти.

Контракти публікуються як NuGet пакет і використовуються іншими частинами системи, сервісами та клієнтами. Там є всі необхідні компоненти для зв’язку: події, команди та запити, — усе, що виходить за рамки сервісу.

Azure Function Host Configuration

public void Configure(IFunctionsHostBuilder builder)

{
var configuration = BuildConfiguration();
var ingrowthConfig = new IngrowthConfig();
configuration.Bind(ingrowthConfig);
services.AddSingleton<TelemetryClient>();
services.AddTransient<IAggregateStore, AggregateStore>();
services.AddSingleton<IEventStore>(_ => new StateEventTableStore(CreateTableStorageOptions());
services.AddSingleton<ITransactionStore>(_ => new TransactionTableStore());
services.AddSingleton<IIntegrationEventStore>(_ => new IntegrationEventTableStore());
services.AddSingleton<IReportBus>(_ => new AzureServiceTopicReportBus());
services.AddSingleton<IEventBus>(_ => new AzureServiceTopicEventBus());
// Registering cloud storage account
services.AddSingleton(CloudStorageAccount.Parse(ingrowthConfig.StorageAccountConnectionString));

Для спрощення роботи з Azure Functions ми зазвичай створюємо шаблон стартапу, який використовується всіма мікросервісами. Процес формування бізнес-об’єктів в Azure Functions виглядає дуже просто: є звичайна точка входу, є Functions стартап, який запускає цю конфігурацію, та тригери. Їх може бути багато. Тригери отримують події, десереалізують дані, а далі запускається той самий MediatR, який ці події віддає вже конкретним обробникам і агрегатам.

У самому Azure кожен мікросервіс ізольований всередині ресурсної групи, і ми маємо всі компоненти, які необхідні для сервісу. З одного репозиторію все розгортається в одну ресурсну групу. Це значно спрощує менеджмент такої інфраструктури. Тут у нас є Command Handler, Event Handler і Process Manager, який виконує роль синхронізації Saga, і сховище. Для рішень з більш високим навантаженням ми використовуємо Cosmos DB.


Щодо вартості: на цьому внутрішньому проєкті вартість середовища — близько 50$. Додатково ми запускаємо тут усі e2e тести. У вартість входять різні інтеграційні сервіси і сам app сервіс, оскільки він не безсерверний. Azure Functions, своєю чергою, коштують менше як 3$, при тому, що обробляють понад 15 млн запитів кожного місяця. Це чи не найдешевший обчислювальний ресурс, який зараз є в Azure, і безкоштовний для кількості запитів до 1 млн, або до 400 TB/s.

Також я підготував проєкт з відкритим вихідним кодом на GitHub, де можна переглянути загальні абстракції CQRS-фреймворку. Усе написано на .NET Standard, тому може бути використано не тільки в Azure Functions, а на будь-якому .NET-проєкті.

Що робити із cold start?

Azure Functions Cold start

/// <summary>
/// WarmerTrigger - keep func always loaded.
/// </summary>
/// <param name="timer">Timer trigger info.</param>
/// <returns>A task representing the asynchronous operation.</returns>
[FunctionName("WarmerTrigger")]
[Disable(“Option_WarmerDisable")]
public async Task WarmAsync([TimerTrigger("0 */4 * * * *")]TimerInfo timer)
{
if (timer.IsPastDue)
{
log.LogInformation("Warmer is running late!");
}
await Task.Delay(0);
}

І ще порада для тих, хто хоче використовувати Consumption Plan (план споживання, який автоматично розподіляє обчислювальну потужність, коли код працює, — ред.). Для нього характерна така проблема, як холодний старт. Її можна по-різному лагодити, але повністю вирішити не вдасться. Найпростіший варіант — додавати таймер-тригер. Він запускається кожні 4 хвилини й нічого не виконує. Це просто сигнал для хосту, що тут є повторюваний тригер, який забезпечує постійне утримання пакету Azure Functions на хості. Це значно зменшує час очікування на запуску. Тоді замість 10–15 секунд, які зазвичай займає холодний старт з нуля, обробка тригера триває 100–300 мілісекунд.

Event Sourcing — дещо складніше за CQRS

Те, що CQRS дуже складний, — це міф. Насправді CQRS дуже простий у порівнянні з Event Sourcing (шаблон проєктування, який представляє стан об’єкта у вигляді множини подій, — ред.). Якщо ви не впевнені щодо використання CQRS, рекомендую почати його використовувати без сорсингу подій, на стаціонарних моделях, на write-частині. З Event Sourcing є кілька складнощів:

  • Проблема підтримки версійності подій. Події просто так видалити не можна: всі старі версії потрібно зберігати, щоб мати можливість зробити повторення системи.
  • Необхідність збереження всієї бізнес-логіки з версіями подій. Навіть якщо ця бізнес-логіка вже застаріла, її треба підтримувати. Інакше результат виконання буде відрізнятися.

Для вирішення цих проблем можна застосувати кілька підходів, на жаль, усі вони неідеальні. Це мапінг старих версій подій на нові та їхній запуск через ту саму логіку; запуск різних пакетів логіки для різних подій; мутації подій, коли вони переміщуються (найменш популярний підхід, допустимий у звичайних SQL-базах, але не у Event Sourcing, де неприпустима міграція даних).

Тому, якщо у вас буде бажання спробувати CQRS, рекомендую почати з CQS-підходу без сорсингу подій.

Що почитати

І наостанок — список книг, які я рекомендую.

Ерік Еванс є автором предметно-орієнтованого проєктування, тому варто прочитати його Domain Driven Design. Метод штурму подій пішов так само від DDD, тож це обов’язково для тих, хто буде займатися мікросервісами і CQRS.

Building Microservices: Designing Fine-Grained Systems Сема Ньюмана про побудову мікросервісів.

Exploring CQR and Event Journey від Microsoft — ще одна хороша стартова книга з CQRS.

З ресурсів найбільш цінним мені здається Greg Yang. Він містить публікації про CQRS і чудовий продукт — Event Store. Це база даних, як саме створена для збереження подій в Event Source.

Дякую, що дочитали до кінця, радий відповісти на запитання.

👍ПодобаєтьсяСподобалось12
До обраногоВ обраному13
LinkedIn
Дозволені теги: blockquote, a, pre, code, ul, ol, li, b, i, del.
Ctrl + Enter
Дозволені теги: blockquote, a, pre, code, ul, ol, li, b, i, del.
Ctrl + Enter

Еще не совсем понятно про параллельность и холодный старт, вы читаете Commands и Integration Events по одному, только в одной Azure Function?
Потому что если если в очередь зайдет второй ивент, пока функция обрабатывает первый, то я так понимаю должно создать вторую функцию, и это будет холодный старт?

Зачем вам Azure Functions c Command Header-рами и Commands Queue если у вас уже есть AppService и можно всю логику сделать на нем, в рамках одного приложения? Как и рекомендуют в книге .NET Microservices.
Еще не совсем понял как у вас вся инфраструктура стоит 50$ в месяц, если самый девешвый продакшен AppService, с всего 3.5 GB памяти, стоит 70$?

EventDB — что вы там считывате каждый раз? у вас же есть AggregateSnapshot чтобы все каждый раз не воспроизводить?
EventHandler — что вы там записываете? воспроизвеленный аггрегат? откуда он приходит, через EventHub?
У вас не проблем с масштабированием когда несколько инстансов EventHandlera будет работать, а EventHub не обеспечивает FIFO и воспроизведенные агрегаты могут быть на EventHandler обработаны в произвольном порядке и в произвольном порядке записываться в базу? или вы привязываетесь к PartitionId для обеспечения FIFO?

Почему в проектах и в вакансиях которые можно найти по Украине приемущественно используют Angular если исходя из статистики глобальной в мире React лидирует?

Статья отличная. Спасибо!

<придирка, просьба строго не судить >
Список литературы можно было бы просто текстом а не картинкой... без ссылок на книги а просто текстом, так проще было бы искать книги
</придирка, просьба строго не судить >

Greg Young — Building an Event Storage
Microsoft — Exploring CQRS and Event Sourcing
Greg Young — CQRS & Event Sourcing
Lorenzo Nicora — A visual introduction to event sourcing and cqrs
Greg Young — A Decade of DDD, CQRS, Event Sourcing
Martin Fowler — Event Sourcing
Eric Evans — DDD and Microservices: At Last, Some Boudaries!
Martin Kleppmann — Event Sourcing and Stream Processing at Scale
Julie Lerman — Data Points — CQRS and EF Data Models
Vaughn Vernon — Reactive DDD: Modeling Uncertainty
Mark Seemann — CQS versus server generated IDs
Udi Dahan — If (domain logic) then CQRS, or Saga?
Event Store — The open-source, functional database with Complex Event Processing in JavaScript
Pedro Costa — Migrating to Microservices and Event-Sourcing: the Dos and Dont’s
David Boike — Putting your events on a diet
DDD Quickly
Dennis Doomen — The Good, The Bad and the Ugly of Event Sourcing
Liquid Projections — A set of highly efficient building blocks to build fast autonomous synchronous and asynchronous projector

спасибо за статью

Дякую. Щось по структурі проекту Мікросервіса не бачу, щоб використовувалось DDD :)

await EnritchCommandAsync(command, httpContext);

- Enritch instead f Enrich. Сама по собі назва дуже загальна і нічого не говорить.

if(ar == null)
{
return NotifySubscription(command, FailedResult.NotFound); // SignalR.
}

— ar чудова назва змінної)
— command взагалі немає в блоці, є cmd.

Необхідність збереження всієї бізнес-логіки з версіями подій.

— дуже важливий момент, через який реально добре зважити чи варто ввязуватись в той Event Sourcing для певних проектів.
Цікаво як ви генеруєте CorrelationId ?

— дуже важливий момент, через який реально добре зважити чи варто ввязуватись в той Event Sourcing для певних проектів.

есть много стратегий работы с версионированием состояния описанных самым Грэгом и последователями на примере работы с event store — держать все версии бизнесс логики не есть строгим требованием к event sourced системам.

CorrelationId генерує Application Insights ще на клієнті і проходить через все end-to-end. код в power point під статтю редагувався, трохи помилок компіляції може бути )

папка з DDD агрегатами в проекті Command Handler. самі абстракції тут: github.com/...​ntellias.CQRS.Core/Domain

Дякую. Звірьок досить не малий вийшов. На GitHub пачка інших значно простіших проектів по CQRS.

Якщо інфраструктуру втягувати в базовий проект, то істота завжди величенька буде

Привет, о ажур, тут щас столкнулся, как изменить локацию на нем?

Ну просто архітект від Бога: закидав все глючним кодом замість щоб розкрити архітектуру починаючи з верхнього рівня :)

— как concurrency ваш фреймворк решает на записи?

— для чего все разворачивать на отдельных azure сервисах с таким количеством инфраструктуры, если можно все это развернуть c медиатром внутри процесса, с вашей нагрузкой особенно 15 млн запросов в месяц?

— зачем решение с веб-сокетами/пуллингом для не real-time браузерного решения или это чисто что бы работал CQRS и были микросервисы?

await EnritchCommandAsync(command, httpContext); // Add JWT and telemetry data from HTTP.
ASID

вам надо на вычитку статью\код.

MediatR це більше CQS, Фаулер чітко описав що має бути фізичне розділення баз данних та серверу для CQRS, у нас realtime, є колективне редагування як в google docs, всі зміни в реальному часі, і всі юзери бачать будь як зміни в даних відразу без рефрешів.

Я могу ваш фреймворк юзать с любым протоколом интеграции клиента и сервера в том числе синхронными request-response, например HTTP? как в таком случае происходит получение ответа браузером?

и хотелось бы узнать вопрос по concurrency handling при обработке ивент хендлером события. как обеспечивается целостность состояния агрегата в concurrent сценариях.

при синхронному буде await на service-bus на стороні asp.net core gateway. це збільшить навантаження на api сервер. concurrent- кожен агрегат послідовно аплаїть події по версії.

concurrent- кожен агрегат послідовно аплаїть події по версії.

а как это технологически решается?

а как это технологически решается?

Один из простых кейсов (не относится к статье и фреймворку напрямую): на API уровне читаем версию из read model’и, передаём дальше в обработчик, и при попытке сделать apply сравниваем версии: если не успела поменяться — апплаим, успела — решаем, что делать. Если у Вас обновление одного и того же агрегата очень страдает от конкурентости (все что-то одно хотят постоянно поменять), то акторы — оч хороший вариант (ещё и кэширование в памяти бесплатное).

Один из простых кейсов (не относится к статье и фреймворку напрямую): на API уровне читаем версию из read model’и, передаём дальше в обработчик, и при попытке сделать apply сравниваем версии: если не успела поменяться — апплаим, успела — решаем, что делать.

это что-то нерабочее совсем с точки зрения concurrency handling так как в optimistic ревизия должна экспоузится на UI вместе с данными — больше похоже на разрешение конфликта через last-write-wins, но ревизия для этого не нужна..

ревизия должна экспоузится на UI вместе с данными

Если по какиим-то причинам надо, пусть экспоузится — этому ничего не мешает. В момент апплая ревизия прописывается в read model — мы же её оттуда и вычитываем.

в optimistic ревизия должна экспоузится на UI

Должна или нет — это обычное бизнес-требование, а не требование optimistic concurrency. В определении даже понятие UI не присутствует, да и зачем оно там...

больше похоже на разрешение конфликта через last-write-wins

Та нет, не похоже).

Если по какиим-то причинам надо, пусть экспоузится — этому ничего не мешает. В момент апплая ревизия прописывается в read model — мы же её оттуда и вычитываем.

я вычитываю юзер пофиль в двумя админами(А, Б) на веб-сайте один меняет имя и в другом меняет имя. при таком подходе я отсылаю оба варианта и оба они по очереди(сначала А, потом Б) вычитывают в обработчике запроса последовательно последнюю актуальную версию документа сохранённую в БД и подставляют ее в апдейт и успешно проводят апдейт(запросы прошли последовательно) и я теряю изменения имени сделанное пользователем А.
Такой подход не решает конфликтов конкуретности.
это неправильное понимание того как работает версионирование документов\обьектов и для чего используеться optimistic.

В определении даже понятие UI не присутствует, да и зачем оно там...

надо внимательней перечитать определение и вникнуть в суть сказанного, само понятие UI не более чем источник изменений и держатель версионированных данных
Before committing, each transaction verifies that no other transaction has modified the data it has read.
Как вы собираетесь проверять, что другая транзакция не внесла изменений в документ — если вы наоборот предлагает читать версию из бд момент апдейта, когда другие сессии уже могли поменять версию и она не соответствует вашей.

надо внимательней перечитать определение и вникнуть в суть сказанного, само понятие UI не более чем источник изменений и держатель версионированных данных
я вычитываю юзер пофиль в двумя админами(А, Б) на веб-сайте один меняет имя и в другом меняет имя. при таком подходе я отсылаю оба варианта и оба они по очереди(сначала А, потом Б) вычитывают в обработчике запроса последовательно последнюю актуальную версию документа сохранённую в БД и подставляют ее в апдейт и успешно проводят апдейт(запросы прошли последовательно) и я теряю изменения имени сделанное пользователем А.

А теперь читаем внимательно цитату из ссылки:

This includes transactions that COMPLETED AFTER THIS TRANSACTION’S START TIME, transactions that are still active at validation time.

То есть, проверяются транзакции, которые уже выполниолись или выполняются в данный момент ПОСЛЕ того, как Ваш второй админ нажал на кнопку. Потому что нажатие на кнопку (а, точнее, момент, когда этот реквест долетит до процессинга на бэкенде) и будет началом второй транзакции. Если первый изменил имя, транзакция прошла за 2 секунды, а второй изменил его через 5 минут, а UI не обновил, и поэтому предыдущее обновление якобы «потерялось» — это вообще не юз-кейс для нашего разговора. Мало того, Вы хоть понимаете, что это вообще не о конкурентности разговор получается?

Как вы собираетесь проверять, что другая транзакция не внесла изменений в документ — если вы наоборот предлагает читать версию из бд момент апдейта, когда другие сессии уже могли поменять версию и она не соответствует вашей.

А если ивент генерируется внешним источником, что тогда? Когда между счетами перебрасываются деньги, то терминал при отправке команды и не собирается считывать никаких версий с последующей передачей их дальше. Или Вы будете во внешний мир свои конкарренси токены отдавать? Интересно, как это укладывается в Вашу картину мира в таком случае. Поэтому предлагаю не выдумывать свои интерпретации, а руководствоваться общей терминологией.

Ну и, чтобы расставить все точки над i: аналогично last write wins — это когда:
1. Админ 1 меняет имя, запускается транзакция 3 секунды.
2. Проходит 1 секунда, Админ 2 меняет имя.
3. На 2й секунде первая транзакция ещё в процессе, а вторая уже валидируется. Координатор (например, база данных) видит конфликт и смотрит, какая транзакция последняя, применяя только её, а предыдущая игнорируется и мы не видим никаких ошибок.

В общем, в Вашем юз-кейсе и конкрарренси конфликта-то нет даже, ну. Какой optimistic, какой last write wins... То, что у Вас лежит на стороне, которая прочитала (пусть UI), вообще не имеет значения.

Если первый изменил имя, транзакция прошла за 2 секунды, а второй изменил его через 5 минут, а UI не обновил, и поэтому предыдущее обновление якобы «потерялось» — это вообще не юз-кейс для нашего разговора. Мало того, Вы хоть понимаете, что это вообще не о конкурентности разговор получается?

это и есть про разрешение конфликтов конкурентного доступа к данным. возьмите любую ситему с версионированием документов — они будут их возвращать в get запросах и ожидать от вас получить их в апдейте запроса.

почему Грэг янг не сделал как вы — а при получении ивента ожидает в паблик контрактах версию от вас, он же мог бы точно так же сделать как вы — перечитать последнюю версию и сам ее из стрима, когда кто-то дернет AppendToStreamAsync :)
developers.eventstore.com/...​-stream-in-a-single-batch
The version at which you expect the stream to be in order that an optimistic concurrency check can be performed. This should either be a positive integer, or one of the constants ExpectedVersion.NoStream, ExpectedVersion.EmptyStream, or to disable the check, ExpectedVersion.Any. See here for a broader discussion of this.

Как вы у себя на проекте такие кейсы решаете/называете? или трете данные об имени пользователя и идете говорите заказчику — что это не concurrency, какие к нам вопросы?:)

это и есть про разрешение конфликтов конкурентного доступа к данным.

Here we go again

Concurrency means multiple computations are happening at the same time.

Соответственно Ваш кейс — это не concurrency. Он был бы concurrency, если бы транзакции пересекались по времени. И конфликт бы поймался тем способом, о котором я писал). Но Вы сами на уровне приложения должны были бы решить, как реагировать на конфликт, потому что это вопрос бизнес-логики, а не фреймворка.

возьмите любую ситему с версионированием документов — они будут их возвращать в get запросах и ожидать от вас получить их в апдейте запроса.

А система с версионированием документов — это дефолтная проблема, которую решает optimistic concurrency? Продолжаем путать бизнес-кейс с общими понятиями.

почему Грэг янг не сделал как вы — а при получении ивента ожидает в паблик контрактах версию от вас, он же мог бы точно так же сделать — перечитать версию и сам ее из стрима :)

А кто Вам сказал, что он не сделал так, как я? Более того, та схема, которую я описываю... Вы не поверите, от кого я её в том числе слышал на видео). Когда он говорил, что валидация по read model’и валидна во многих бизнес-кейсах. Что я и предлагаю. Почему Вы думаете, что при этом туда нужно передавать версию неизвестно откуда (из UI)? А read model — это ни разу не UI или какая-то внешняя система, которая вычитала данные.

Как вы у себя на проекте такие кейсы решаете/называете? или трете данные и идете говорите заказчику — что это не concurrency, какие к нам вопросы?:)

Решаем соответственно бизнес-кейсу. Самая хардкорная версия того, что Вы описываете — это гугл докс. Вот и посмотрите, что там происходит). Реактивный фронт, автомердж. Как Вы такой автомердж сделаете на уровне дженерик фреймворка? Откуда фреймворк знает, как Вы хотите решать эти конфликты?

Concurrency control подразумевает scope. Вы хотите consistency между бэком и фронтом достичь с помощью архитектурного фреймворка? Ну, удачи — Эйнштейн бы плакал). CAP теорема сюда же.

А в Вашем случае сделайте реактивный фронт под конкретный случай и не морочьте кастомеру голову).

Вы лезете в сложные вещи и материи не разобравшись в основах к сожалению, прочитайте хотя бы что там пишут по аспнету и entity framework.

Да-да, продолжаем спорить со ссылками на базовые вещи своим бизнес-кейсом, и документацией его на фреймворке, мечтая о бесконечном scope координатора транзакций, и что все будут думать так же). Я уже 100 лет мусолю Ваш пример, а про обратные Вам сказать всё так же нечего. Потому что про scope Вы как не думали, так и не думаете.

docs.microsoft.com/...​n-asp-net-mvc-application

Вот в принципе на пальцах тоже самое о том, что такое optimistic concurrency.

Test concurrency handling
Run the site and click Departments.

Right click the Edit hyperlink for the English department and select Open in new tab, then click the Edit hyperlink for the English department. The two tabs display the same information.

Change a field in the first browser tab and click Save.

The browser shows the Index page with the changed value.

Change a field in the second browser tab and click Save. You see an error message:

Предпоследний абзац dou.ua/...​rums/topic/34317/#2198675

Я уже приводил примеры, когда это невалидно — какой смысл форсить те, которые нужны именно Вам? Если хочется, чтобы scope хендлинга шёл прямо в UI, то это бизнес-специфика, а не требования optimistic concurrency как такового. Поэтому ругать чей-то фреймворк, что он это не поддерживает нет смысла, базируясь на своём одном юз-кейсе. Кому-то нужен будет strong consistency, и т.п., прибегут и будут ругать.

Поэтому ругать чей-то фреймворк, что он это не поддерживает нет смысла, базируясь на своём одном юз-кейсе.

Требования оптимистичной конуренции это требование выставленное на уровне CQRS-paper от Грэга Янга который описывает сам подход, концепция любой write операция предполагает это, любой event store содержит в контрактах ревизию (в пейпере есть контракт)
cqrs.files.wordpress.com/...​010/11/cqrs_documents.pdf (ст 43)
все это надо что бы работал сам event store, rolling snpashot и в конце концов можно было решать конфликты, когда они возникают в данных.
Логично этого требовать от CQRS фреймворка, не?

Если хочется, чтобы scope хендлинга шёл прямо в UI, то это бизнес-специфика, а не требования optimistic concurrency как такового.

Еще раз — вы не понимаете как работает optimistic concurrency, то что вы выше описали как concurrency check, когда бекенд читает последнюю ревизию из базы перед апдейтом закончится покорапченными данными из-за конкуретной работы пользователей, как мы уже разобрались — о чем может быть речь?
Так же я скинул ссылку на документацию MS где это описано самыми простыми словами, очень советую почитать — там же весьма красноречивые примеры кода есть. Не верите мне — пойдите на гитхабе найдите хоть один авторитетный пример, где делают аналогично вам(читают ревизию внутри write/update метода не получая ее как аргумент), optimistic concurrency как коробочное решение есть в куче мест и систем — cosmos, dynamo, elastic, event store..

Требования оптимистичной конуренции это требование выставленное на уровне CQRS-paper от Грэга Янга который описывает сам подход, концепция любой write операция предполагает это, любой event store содержит в контрактах ревизию (в пейпере есть контракт)

Ну, правильно — там же говорится про Event Store, а не надстройку над ним. С чем я как бы и не спорю, о чём уже выше писал — версию же мы передаём куда-то и в моём кейсе.

Еще раз — вы не понимаете как работает optimistic concurrency, то что вы выше описали как concurrency check, когда бекенд читает последнюю ревизию из базы перед апдейтом закончится покорапченными данными из-за конкуретной работы пользователей, как мы уже разобрались — о чем может быть речь?

То, что я выше написал, было ещё до Вашего пассажа про optimistic concurrency — это раз. Претензия была в том, что concurrency handling’а нет, и Вы привели пример, что optimistic concurrency ревизия нужна, чтобы передавать её с фронта, потому что Ваш юз-кейс этого требует. И продолжаете говорить про покоррапченные данные, которые будут покоррапченные только в подобных юз-кейсах.

Но не во всех, где этим можно пренебречь. Если инициатором является внешняя система, которая ничего не знает о нашей версионности агрегатов. Два ивента от внешних систем конкурентно генерируют изменения. Прилетают, каждый читает данные read model вкл. версию, и старается с ней что-то сделать. Валидируем, всё нормально — стараемся записать, передаём версию. Несовпадение — на уровне application уже думаем, что делать с конфликтом: или отдаём ошибку, или придумываем алгоритмы её разруливания. Работает ли optimistic concurrency в этом случае? Да. Передаём мы версию от клиента? Нет. Будут покоррапченные данные? Не будут. И таких юз-кейсов, когда от клиента не придёт ничего, хватает.

По идее, до меня начинает доходить, в чём соль спора. Я не смотрел кусок кода именно ивент стора, и по дефолту предположил, что там всё есть). Поэтому мне казалось, что Вы требуете какой-то внешний интерфейс для передачи ревизии извне (не на уровне Event Store), и не мог понять, зачем этот интерфейс нужен в явном виде, если даже не всегда требуется — пусть каждый сам решает, что накручивать на базовый фреймворк, в зависимости от задач...

Посмотрите внимательно на абстрации фреймворка — aggregate(Должен всегда поддерживать стейт в валидном состоянии с точки зрения бизнесс правил), evenhandler который должен гарантировать целостность в конкуретной среде при применении ивента полученного из команды. Там нет никакого контроля целостности стейта в конкурентной среде. По-умолчанию любой проект веб-сервисов будет предполагать многопользовательский режим и кокурентность. О какой базовой функциональности cqrs фреймворка идет речь, где это не нужно? Однопоточная распределенный веб сервисы? Джоба что делает что-то без вмешательства пользователя? Ну, ок...

— как concurrency ваш фреймворк решает на записи?

Concurrency — это отдельный разговор в зависимости от юз-кейса. Если сделать event-based коммуникацию без всяких CQRS и ES, она же никуда не денется. Поэтому можно смотреть на это с разных сторон для разных кейсов.

К примеру, посылка ивентов:
1. Azure Service Bus поддерживает session ordering.
1. Events Hubs может писать в один и тот же partition, соблюдая ordering.
Обработка ивентов:
1. Ивенты вычитываются последовательно.
2. Закрутить гайки на уровне транзакционности базы.
3. Сделать агрегаты акторами с distributed lock из коробки, обеспечив consistency на уровне приложения.
4. Применять 2-phase lock.
5. Saga’и (можно в виде Durable Functions).

В общем, вариантов очень много.

Concurrency — это отдельный разговор в зависимости от юз-кейса

если фреймворк сам занимается менеджментом состояния агрегата было бы неплохо понимать какой у него есть апи для работы с concurrency; если я начну все описанное делать сам, мне можно будет выкинуть половину функциональности фреймворка.

Тут нет ничего, что решает concurrency как мне видится. Если я этот код запущу в двух разных процессах и начну в каждом обновлять один и тот же агрегат через ваш event handler — последний просто перетрет изменения предыдущего. Я верно понимаю?

Это вроде бы проливает свет немного на Ваши взгляды). Но ведь из-за того, что у разных юз-кейсов свои критерии, и нет смысла имплементировать такое на подобном уровне — придётся учитывать их все. Поэтому я и задал вопрос: а как Вы эти проблемы без фреймворка решаете на «стандартной» архитектуре? Ведь нет разницы, Вы это будете вызывать на разных инстансах функций или на одном бинарнике, задеплоенном на одной виртуалке — возможна практически такая же конкурентность, концептуальной разницы нет. Поэтому такая постановка вопроса не очень понятна.

не правильно, бо на один агрегат два паралельно не запуститься. делівері йде послідовно по session_id на кожного агрегата, це гарантовано на рівні service bus.

теперь понятно. Вы бы сразу написали, что фреймворк предполагает неконкуретную обработку запросов на уровне транспорта сообщений — так было проще для понимания ограничений и возможностей. и для пользователей раз вы уже паблик либу сделали полагаетесь на какие-то неявные pre-requisit внешнего окружения в README.md закинули бы на гитхабе, так как по сути это design gap (паблик апи библиотеки и конфигурация не отражает важные детали специфики ее работы для пользователя, которому об этом надо позаботиться).

saga на durable functions, session ordering per aggregate, storage partition per aggregate. github.com/...​lias_CQRS/tree/master/src

Підписатись на коментарі