Як async/await дійсно працює в C#. Частина 2
Вітаю, мене звати Юрій Рожков. Займаюсь автоматизацією тестування, розробкою тестових фреймворків, по можливості закриваю задачі девелоперів і налаштовую інфраструктуру. Так сталось, що крайні 7 років мій техстек роботи — це .NET/C# і все, з ним пов’язане. Нещодавно зіткнувся з цікавою статтею про async/await, про що й буде далі мова.
Disclaimer: ця стаття є перекладом оригіналу, опублікованого тут. Перша частина перекладу за посиланням.
Вступ задач (tasks)
.NET Framework 4.0 ввів тип System.Threading.Tasks.Task
— структуру даних, котра представляє остаточне завершення асинхронних операцій (інші фреймворки називають це «promise» або «future»). Таска зберігає в собі результат логічного завершення операції. Досить просто.
Але ключове в Тасці, що робить її суттєво кориснішою ніж IAsyncResult
— вбудоване поняття продовження. Ця річ означає, що будь-яка Таска може асинхронно сповістити, коли вона завершується з самою задачею, що займається синхронізацією, щоб гарантувати, що продовження роботи викликається незалежно від того, чи основна задача вже завершена, ще не завершена, або завершується одночасно за запитом.
Чому це важливо? Якщо ви повернетесь до старого АРМ-патерну, то згадаєте 2 основні проблеми.
- Вам необхідно реалізувати свій
IAsyncResult
для кожної операції: не було вбудованого методу, який би кожен міг використати - Крім Begin методу, вам було необхідно знати, що ви хочете виконати після його завершення. Це робить обробку і композиції будь-яких асинхронних реалізацій таким собі челенджем.
На відміну від цього, в Тасці, котра дозволяє пройтись по асинхронній операції і передавати продовження задачі після того, як операція була ініційована. Вам не потрібно передавати це продовження методу, що ініціює операцію. Будь-хто у кого є асинхронні операції, може створювати Таску, і будь-хто, хто опрацьовує асинхронні операції може опрацьовувати Таску, і нічого не треба робити для підключення — Таска стає універсальним підходом, що дозволяє використання і продюсерів і консюмерів для асинхронних операцій. І це повністю змінило .NET.
Щоб трохи розібратись, давайте замість розбирання кода для Таски реалізуємо одну річ. Зрештою, це всього лише структура даних, що координує сигнал по завершенню роботи. Почнемо з пари полів:
class MyTask { private bool _completed; private Exception? _error; private Action<MyTask>? _continuation; private ExecutionContext? _ec; ... }
Поле _completed
— завершена таска, _error
— зберігання помилки при виконанні таски. Якщо ми імплементуємо MyTask<TResult>
, то також буде private TResult _result -
вдалий результат операції.
Поки що це в цілому виглядає як імплементація IAsyncResult
. Але вся суть в полі _continuation,
де ми підтримуємо одинарне продовження. Це делегат, котрий буде викликаний після завершення задачі.
Тут трохи зупинимось. Як зазначалось, одна з переваг Таски — можливість постачання роботи на продовження (колбек) після того, як операція ініційована. Нам для цього потрібен окремий метод:
public void ContinueWith(Action<MyTask> action) { lock (this) { if (_completed) { ThreadPool.QueueUserWorkItem(_ => action(this)); } else if (_continuation is not null) { throw new InvalidOperationException("Unlike Task, this implementation only supports a single continuation."); } else { _continuation = action; _ec = ExecutionContext.Capture(); } } }
Якщо задача була помічена як завершена до моменту виклику ContinueWith
, ContinueWith
додає в чергу виконання делегату. Інакше, метод зберігає делегат, і продовження може бути додано в чергу, коли задача завершується (також зберігається ExecutionContext,
і використовує, коли делегат викликається). Досить просто.
Нам потрібно помітити MyTask
як завершену, тобто будь-яка асинхронна операція, яку він представляє, завершена. Для цього створимо 2 методи, SetResult — позначити успішне завершення, SetException — позначити завершення з помилкою:
public void SetResult() => Complete(null); public void SetException(Exception error) => Complete(error); private void Complete(Exception? error) { lock (this) { if (_completed) { throw new InvalidOperationException("Already completed"); } _error = error; _completed = true; if (_continuation is not null) { ThreadPool.QueueUserWorkItem(_ => { if (_ec is not null) { ExecutionContext.Run(_ec, _ => _continuation(this), null); } else { _continuation(this); } }); } } }
Ми зберігаємо помилку, позначаємо задачу як таку, що завершується, і якщо продовження було попередньо зареєстроване, додаємо його в чергу.
Нарешті, нам потрібно поширити ексепшен, котрий міг виникнути в задачі (якщо це був MyTask<T>
, повертати її _result
); полегшити певні сценарії — ми також дозволяємо цьому методу блокувати очікування задачі на завершення, котрі ми можемо реалізувати в контексті ContinueWith
.
public void Wait() { ManualResetEventSlim? mres = null; lock (this) { if (!_completed) { mres = new ManualResetEventSlim(); ContinueWith(_ => mres.Set()); } } mres?.Wait(); if (_error is not null) { ExceptionDispatchInfo.Throw(_error); } }
Це все. Будьте впевнені, реальна Таска набагато складніша, з ефективнішою реалізацією, з підтримкою будь-якої кількості продовжень, з безліччю варіантів, як воно має поводитись (наприклад, чи мають продовження бути додані в чергу або викликані синхронно), з можливістю зберігати декілька ексепшенів замість одного, з розумінням скасовування, з купою хелпер-методів тощо.
Ви могли помітити в нашій реалізації MyTask
публічні методи SetResult
/SetException
. В звичайній Тасці також є ці методи, внутрішні, з типом System.Threading.Tasks.TaskCompletionSource,
який служить «продюсером» для задачі та її завершення. Такі методи відокремлюють завершення методів від споживання.
Ви можете передавати Таску, не турбуючись про те, що вона виконана вами; сигнал на завершення деталі створення Таски і право на її завершення через TaskCompletionSource
(CancellationToken і CancellationTokenSource
слідують схожому паттерну: CancellationToken
врапер для CancellationTokenSource,
що обслуговує частину обробки сигналу скасування, але без можливості його створювати).
Можемо імплементувати свій MyTask.WhenAll
:
public static MyTask WhenAll(MyTask t1, MyTask t2) { var t = new MyTask(); int remaining = 2; Exception? e = null; Action<MyTask> continuation = completed => { e ??= completed._error; // just store a single exception for simplicity if (Interlocked.Decrement(ref remaining) == 0) { if (e is not null) t.SetException(e); else t.SetResult(); } }; t1.ContinueWith(continuation); t2.ContinueWith(continuation); return t; }
Або MyTask.Run
:
public static MyTask Run(Action action) { var t = new MyTask(); ThreadPool.QueueUserWorkItem(_ => { try { action(); t.SetResult(); } catch (Exception e) { t.SetException(e); } }); return t; }
Або MyTask.Delay
:
public static MyTask Delay(TimeSpan delay) { var t = new MyTask(); var timer = new Timer(_ => t.SetResult()); timer.Change(delay, Timeout.InfiniteTimeSpan); return t; }
Зрозуміло. З Таскою усі попередні асинхронні патерни .NET залишаються у минулому. Всюди, де ми раніше імплементували АРМ або ЕАР, тепер методи, що повертають Таску.
Вступ ValueTasks
Таска продовжує бути робочою лошадкою в .NET дотепер, з новими методами, що всюди повертають Task і Task.
Однак, Таска — це клас, а значить при створенні вимагає ресурсів. В більшості випадків, один додатковий розподіл ресурсів не впливає істотно на продуктивність усіх операцій, крім найчутливіших.
Однак, пам’ятаємо, що синхронне завершення асинхронних операцій доволі поширене. Stream.ReadAsync повертає Task, але якщо ви читаєте з BufferedStream, то є шанс, що багато читань завершаться синхронно, з причини того, що достатньо дістати дані з in-memory буфера ніж виконувати системні виклики для реального I/O.
Небажано резервувати окремий об’єкт тільки для повернення самих даних (що ми також спостерігали з АПМ). Не-дженерік метод може повернути сінглтон вже завершеної таски, водночас той самий сінглтон передає сама Таска у вигляді Task.CompletedTask. А для дженерік метода Task неможливо закешувати всі можливі TResult.
Що тут можна зробити? Можна закешувати окремі Task. Наприклад, Task має 2 значення для кеша — одне для true і одне для false. Для Task замість кешування мільярдів значень, заведено кешувати декілька з них, скажімо, від −1 до 8.
Або, у разі довільного типу, кешуємо значення за замовчуванням default(TResult) для кожного відповідного типу. Насправді Task.FromResult саме так і працює, використовуючи невеликий кеш або резервуючи новий Task для окремих значень.
Інакше буде у випадку роботи з Stream.ReadAsync, коли має сенс викликати його декілька разів з однаковою кількістю байтів для читання, і в результаті повертається однакове int значення. В такому разі, щоб уникнути багаторазового резервування, Stream
типи кешують останній Task, що був повернутий, і при наступному читанні, якщо операція завершується синхронно, вдало і з тим же самим результатом, повертається теж саме значення Task.
Але що робити в інших випадках, коли треба уникнути такого резервування задля збереження продуктивності? Тут на допомогу приходить ValueTask<TResult>
(більше деталей тут). Починає як незалежна одиниця між TResult
та Task<TResult>
, і наприкінці, ігноруючи навороти, дає миттєво результат або обіцянку (проміс) про результат у майбутньому:
public readonly struct ValueTask<TResult> { private readonly Task<TResult>? _task; private readonly TResult _result; … }
Метод повертає ValueTask замість Task, і внаслідок більшого типу, що повертається, уникає резервування Task, якщо TResult відомий на час коли його треба повернути.
Однак, є окремі кейси, вибагливі до продуктивності, коли треба уникати резервування Task навіть при асинхронному завершені. Наприклад, Socket знаходиться внизу взаємодії через мережу, методи SendAsync та ReceiveAsync у сокетах завершуються синхронно та асинхронно. Було б непогано відправку і отримання даних по сокету зробити без додаткового резервування пам’яті, незалежно від синхронної / асинхронної операції.
Так ми отримаємо System.Threading.Tasks.Sources.IValueTaskSource<TResult>
:
public interface IValueTaskSource<out TResult> { ValueTaskSourceStatus GetStatus(short token); void OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags); TResult GetResult(short token); }
Цей інтерфейс дозволяє при імплементації прокинути свій об’єкт для ValueTask, дозволяючи реалізувати методі як GetResult, що повертає результат операції і OnCompleted, щоб підключити продовження після операції. Завдяки цьому в ValueTask з’явились невеличкі зміни, з полем object? _obj замість Task? _task:
public readonly struct ValueTask<TResult> { private readonly object? _obj; private readonly TResult _result; … }
_task — Task або null, _obj може бути IValueTaskSource. Коли Task помічається як завершена, вона залишається в цьому стані без повернення в незавершений стан. На відміну від цього, IValueTaskSource<TResult>
має повний контроль над реалізацією, і може міняти стан на завершений-незавершений, та згідно з контрактом ValueTask об’єкт може бути опрацьований тільки раз, і не має знати про зміни після опрацювання (для цього є окреме правило CA2012).
Це дозволяє тому ж Socket поєднувати інстанси IValueTaskSource для повторних викликів, до 2 інстансів кешуються — один для читання і один для запису, чого достатньо у 99,999% випадках.
Треба також згадати звичайний не-дженерік ValueTask, котрий надає невеликий бенефіт у продуктивності, оскільки та ж сама умова може бути представлена з Task.CompletedTask. Коли ми переймаємось про уникнення резервування ресурсів при асинхронному завершені, то це також стосується і не-дженерік методів. Поряд з IValueTaskSource ми також маємо і IValueTaskSource і ValueTask.
У нас є Task, Task, ValueTask, та ValueTask, з котрими ми вміємо працювати, створювати довільні асинхронні операції та опрацьовувати роботу після завершення цих операцій. Ми можемо це робити до та після операції.
Але ж ця робота після завершення досі в колбеку. Як ми можемо це виправити?
C# ітератори в поміч
Промінь надії прийшов з появою ітераторів в C# 2.0. Один з прикладів — IEnumerable або IEnumerator. Якщо вам необхідно створити колекцію для перерахування чисел Фібоначчі, то можна написати наступне:
public static IEnumerable<int> Fib() { int prev = 0, next = 1; yield return prev; yield return next; while (true) { int sum = prev + next; yield return sum; prev = next; next = sum; } }
І перерахувати значення:
foreach (int i in Fib()) { if (i > 100) break; Console.Write($"{i} "); }
Або скоротити за допомогою System.Linq.Enumerable
:
foreach (int i in Fib().Take(12)) { Console.Write($"{i} "); }
Або з використанням IEnumerator<T>
:
using IEnumerator<int> e = Fib().GetEnumerator(); while (e.MoveNext()) { int i = e.Current; if (i > 100) break; Console.Write($"{i} "); }
У всіх випадках результатом буде:
0 1 1 2 3 5 8 13 21 34 55 89
Суть в тому, що нам необхідно викликати метод Fib декілька разів. Викликаємо MoveNext
, він потрапляє у метод, який виконується допоки не досягає yield return
, і в цей момент має повернутись true
та наступний доступ до Current
має повернути отримане значення.
Після цього викликаємо MoveNext
знову, і необхідно забрати Fib відразу після зупинки, з повним станом з попереднього виклику. C# надає ітератори як ефективні підпрограми, перетворюючи Fib ітератор в повноцінну машину стану:
Вся логіка Fib тепер всередині методу MoveNext, як частина переходів, що дозволяють реалізації відділятись до того як зупинилась останній раз. Ті локальні зміні що ми записали в функції, як prev, next і sum, були «посунуті» у поля лічильника (enumerator), таким чином вони можуть зберігатись між викликами MoveNext.
Зауважте, що попередній код згенерований компілятором, не буде компілюватись як є. C# компілятор синтезує «непромовні» імена, які є нормальними для IL, але не для C#.
У попередньому прикладі перерахування викликано вручну з використанням IEnumerator<T>
. На цьому рівні, ми вручну викликаємо MoveNext()
, з вибором часу для входу у відпрограму. Але припустимо, що замість цього ми зробимо виклик MoveNext
частиною роботи після завершення, що виконуються після завершення асинхронної операції. А якщо виконати yield return
на асинхронній операції, тим самим додамо продовження до отриманого об’єкта, щоб потім виконати MoveNext
:
Далі стає цікавіше. Нам дається безліч задач. Щоразу з MoveNext
переходимо до наступної таски, потім підключаємо продовження до цієї таски; коли Таска завершиться, це обернеться тим, що робить MoveNext
— взяти наступну Таску тощо.
Це базується на ідеї Таски як єдине відображення будь-якої асинхронної операції, і перерахування може бути послідовністю будь-яких асинхронних операцій. Ця послідовність надходить, звичайно, з ітератора. Пам’ятаєте, наскільки жахливою була реалізація через CopyStreamToStream
? Оцініть наступне:
Це вже набагато краще. Ми викликаємо допоміжний метод IterateAsync
, і перелік, що ми заповнюємо, походить з ітератора, котрий контролює весь процес копіювання. Він викликає Stream.ReadAsync,
потім поступово передає дані в Таску
; ця таска є те, що буде передано в IterateAsync
після виклику MoveNext
, і IterateAsync
додаватиме продовження в цю Таску, котра після завершення викличе MoveNext
і повернеться в ітератор після yield.
У цей момент логіка Impl
отримує результат методу, викликає WriteAsync
і знову наповнює Таску. І так далі.
Це і є початком async
/await
в C# та .NET. Десь близько 95% логіки підтримки ітераторів і async
/await
у компіляторі C# є спільним. Інший синтаксис, типи, але фундаментально все те саме. Сфокусуйтесь на yield return,
і замість них ви побачите await.
Насправді деякі досвідчені розробники використовували ітератори таким чином для асинхронного програмування до того, як світ побачив async
/await.
Схожа трансформація відбулась в експериментальній мові
Axum, що стало натхненням для підтримки async в C#.
Немає коментарів
Додати коментар Підписатись на коментаріВідписатись від коментарів